Skip to main content

hydro_test/tutorials/
concurrent_clients.rs

1use hydro_lang::prelude::*;
2
3pub struct CounterServer;
4
5#[expect(clippy::type_complexity, reason = "multiple outputs")]
6pub fn concurrent_counter_service<'a>(
7    increment_requests: KeyedStream<u32, (), Process<'a, CounterServer>>,
8    get_requests: KeyedStream<u32, (), Process<'a, CounterServer>>,
9) -> (
10    KeyedStream<u32, (), Process<'a, CounterServer>>, // increment acknowledgments
11    KeyedStream<u32, usize, Process<'a, CounterServer>>, // get responses
12) {
13    let increment_request_processing = increment_requests.atomic();
14    let current_count = increment_request_processing.clone().values().count();
15    let increment_ack = increment_request_processing.end_atomic();
16
17    let get_response = sliced! {
18        let request_batch = use(get_requests, nondet!(/** we never observe batch boundaries */));
19        let count_snapshot = use::atomic(current_count, nondet!(/** atomicity guarantees consistency wrt increments */));
20
21        request_batch.cross_singleton(count_snapshot).map(q!(|(_, count)| count))
22    };
23
24    (increment_ack, get_response)
25}
26
27#[cfg(test)]
28mod tests {
29    use hydro_lang::prelude::*;
30
31    use super::*;
32
33    #[test]
34    fn test_concurrent_clients() {
35        let mut flow = FlowBuilder::new();
36        let process = flow.process();
37
38        let (inc_in_port, inc_requests) = process.sim_input();
39        let inc_requests = inc_requests.into_keyed();
40
41        let (get_in_port, get_requests) = process.sim_input();
42        let get_requests = get_requests.into_keyed();
43
44        let (inc_acks, get_responses) = concurrent_counter_service(inc_requests, get_requests);
45
46        let inc_out_port = inc_acks.entries().sim_output();
47        let get_out_port = get_responses.entries().sim_output();
48
49        flow.sim().exhaustive(async || {
50            // Client 1 increments
51            inc_in_port.send((1, ()));
52            inc_out_port.assert_yields_unordered([(1, ())]).await;
53
54            // Client 2 increments
55            inc_in_port.send((2, ()));
56            inc_out_port.assert_yields_unordered([(2, ())]).await;
57
58            // Client 1 reads - should see 2
59            get_in_port.send((1, ()));
60            get_out_port.assert_yields_unordered([(1, 2)]).await;
61
62            // Client 2 reads - should also see 2
63            get_in_port.send((2, ()));
64            get_out_port.assert_yields_unordered([(2, 2)]).await;
65        });
66    }
67}