Skip to main content

hydro_test/tutorials/
keyed_counter.rs

1use hydro_lang::live_collections::stream::NoOrder;
2use hydro_lang::location::{Location, NoTick};
3use hydro_lang::prelude::*;
4
5pub struct CounterServer;
6
7#[expect(clippy::type_complexity, reason = "output types with orderings")]
8pub fn keyed_counter_service<'a, L: Location<'a> + NoTick>(
9    increment_requests: KeyedStream<u32, String, L, Unbounded>,
10    get_requests: KeyedStream<u32, String, L, Unbounded>,
11) -> (
12    KeyedStream<u32, String, L, Unbounded>,
13    KeyedStream<u32, (String, usize), L, Unbounded, NoOrder>,
14) {
15    let increment_request_processing = increment_requests.atomic();
16    let current_count = increment_request_processing
17        .clone()
18        .entries()
19        .map(q!(|(_, key)| (key, ())))
20        .into_keyed()
21        .value_counts();
22    let increment_ack = increment_request_processing.end_atomic();
23
24    let requests_regrouped = get_requests
25        .entries()
26        .map(q!(|(cid, key)| (key, cid)))
27        .into_keyed();
28
29    let get_lookup = sliced! {
30        let request_batch = use(requests_regrouped, nondet!(/** we never observe batch boundaries */));
31        let count_snapshot = use::atomic(current_count, nondet!(/** atomicity guarantees consistency wrt increments */));
32
33        request_batch.join_keyed_singleton(count_snapshot)
34    };
35
36    let get_response = get_lookup
37        .entries()
38        .map(q!(|(key, (client, count))| (client, (key, count))))
39        .into_keyed();
40
41    (increment_ack, get_response)
42}
43
44#[cfg(test)]
45mod tests {
46    use hydro_lang::prelude::*;
47
48    use super::*;
49
50    #[test]
51    fn test_counter_read_after_write() {
52        let mut flow = FlowBuilder::new();
53        let process = flow.process::<CounterServer>();
54
55        let (inc_in_port, inc_requests) = process.sim_input();
56        let inc_requests = inc_requests.into_keyed();
57
58        let (get_in_port, get_requests) = process.sim_input();
59        let get_requests = get_requests.into_keyed();
60
61        let (inc_acks, get_responses) = keyed_counter_service(inc_requests, get_requests);
62
63        let inc_out_port = inc_acks.entries().sim_output();
64        let get_out_port = get_responses.entries().sim_output();
65
66        flow.sim().exhaustive(async || {
67            inc_in_port.send((1, "abc".to_owned()));
68            inc_out_port
69                .assert_yields_unordered([(1, "abc".to_owned())])
70                .await;
71            get_in_port.send((1, "abc".to_owned()));
72            get_out_port
73                .assert_yields_only_unordered([(1, ("abc".to_owned(), 1))])
74                .await;
75        });
76    }
77}