hydro_test/tutorials/
concurrent_clients.rs1use hydro_lang::prelude::*;
2
3pub struct CounterServer;
4
5#[expect(clippy::type_complexity, reason = "multiple outputs")]
6pub fn concurrent_counter_service<'a>(
7 increment_requests: KeyedStream<u32, (), Process<'a, CounterServer>>,
8 get_requests: KeyedStream<u32, (), Process<'a, CounterServer>>,
9) -> (
10 KeyedStream<u32, (), Process<'a, CounterServer>>, KeyedStream<u32, usize, Process<'a, CounterServer>>, ) {
13 let increment_request_processing = increment_requests.atomic();
14 let current_count = increment_request_processing.clone().values().count();
15 let increment_ack = increment_request_processing.end_atomic();
16
17 let get_response = sliced! {
18 let request_batch = use(get_requests, nondet!());
19 let count_snapshot = use::atomic(current_count, nondet!());
20
21 request_batch.cross_singleton(count_snapshot).map(q!(|(_, count)| count))
22 };
23
24 (increment_ack, get_response)
25}
26
27#[cfg(test)]
28mod tests {
29 use hydro_lang::prelude::*;
30
31 use super::*;
32
33 #[test]
34 fn test_concurrent_clients() {
35 let mut flow = FlowBuilder::new();
36 let process = flow.process();
37
38 let (inc_in_port, inc_requests) = process.sim_input();
39 let inc_requests = inc_requests.into_keyed();
40
41 let (get_in_port, get_requests) = process.sim_input();
42 let get_requests = get_requests.into_keyed();
43
44 let (inc_acks, get_responses) = concurrent_counter_service(inc_requests, get_requests);
45
46 let inc_out_port = inc_acks.entries().sim_output();
47 let get_out_port = get_responses.entries().sim_output();
48
49 flow.sim().exhaustive(async || {
50 inc_in_port.send((1, ()));
52 inc_out_port.assert_yields_unordered([(1, ())]).await;
53
54 inc_in_port.send((2, ()));
56 inc_out_port.assert_yields_unordered([(2, ())]).await;
57
58 get_in_port.send((1, ()));
60 get_out_port.assert_yields_unordered([(1, 2)]).await;
61
62 get_in_port.send((2, ()));
64 get_out_port.assert_yields_unordered([(2, 2)]).await;
65 });
66 }
67}