Skip to main content

hydro_test/tutorials/
single_counter.rs

1use hydro_lang::prelude::*;
2
3pub struct CounterServer;
4
5#[expect(clippy::type_complexity, reason = "multiple outputs")]
6pub fn single_counter_service<'a>(
7    increment_requests: KeyedStream<u32, (), Process<'a, CounterServer>>,
8    get_requests: KeyedStream<u32, (), Process<'a, CounterServer>>,
9) -> (
10    KeyedStream<u32, (), Process<'a, CounterServer>>, // increment acknowledgments
11    KeyedStream<u32, usize, Process<'a, CounterServer>>, // get responses
12) {
13    let increment_request_processing = increment_requests.atomic();
14    let current_count = increment_request_processing.clone().values().count();
15    let increment_ack = increment_request_processing.end_atomic();
16
17    let get_response = sliced! {
18        let request_batch = use(get_requests, nondet!(/** we never observe batch boundaries */));
19        let count_snapshot = use::atomic(current_count, nondet!(/** atomicity guarantees consistency wrt increments */));
20
21        request_batch.cross_singleton(count_snapshot).map(q!(|(_, count)| count))
22    };
23
24    (increment_ack, get_response)
25}
26
27#[cfg(test)]
28mod tests {
29    use hydro_lang::prelude::*;
30
31    use super::*;
32
33    #[test]
34    fn test_counter_read_after_write() {
35        let mut flow = FlowBuilder::new();
36        let process = flow.process();
37
38        let (inc_in_port, inc_requests) = process.sim_input();
39        let inc_requests = inc_requests.into_keyed();
40
41        let (get_in_port, get_requests) = process.sim_input();
42        let get_requests = get_requests.into_keyed();
43
44        let (inc_acks, get_responses) = single_counter_service(inc_requests, get_requests);
45
46        let inc_out_port = inc_acks.entries().sim_output();
47        let get_out_port = get_responses.entries().sim_output();
48
49        flow.sim().exhaustive(async || {
50            inc_in_port.send((1, ()));
51            inc_out_port.assert_yields_unordered([(1, ())]).await;
52            get_in_port.send((1, ()));
53            get_out_port.assert_yields_only_unordered([(1, 1)]).await;
54        });
55    }
56}