Skip to main content

hydro_test/tutorials/
single_counter_buggy.rs

1use hydro_lang::prelude::*;
2
3pub struct CounterServer;
4
5// buggy version which does not guarantee consistent reads after increment acks
6#[expect(clippy::type_complexity, reason = "multiple outputs")]
7pub fn single_counter_service_buggy<'a>(
8    increment_requests: KeyedStream<u32, (), Process<'a, CounterServer>>,
9    get_requests: KeyedStream<u32, (), Process<'a, CounterServer>>,
10) -> (
11    KeyedStream<u32, (), Process<'a, CounterServer>>,
12    KeyedStream<u32, usize, Process<'a, CounterServer>>,
13) {
14    let current_count = increment_requests.clone().values().count();
15    let increment_ack = increment_requests;
16
17    let get_response = sliced! {
18        let request_batch = use(get_requests, nondet!(/** we never observe batch boundaries */));
19        let count_snapshot = use(current_count, nondet!(/** intentional, based on when the request came in */));
20
21        request_batch.cross_singleton(count_snapshot).map(q!(|(_, count)| count))
22    };
23
24    (increment_ack, get_response)
25}
26
27#[cfg(test)]
28mod tests {
29    use hydro_lang::prelude::*;
30
31    use super::*;
32
33    #[test]
34    #[should_panic]
35    fn test_buggy_counter_read_after_write() {
36        let mut flow = FlowBuilder::new();
37        let process = flow.process();
38
39        let (inc_in_port, inc_requests) = process.sim_input();
40        let inc_requests = inc_requests.into_keyed();
41
42        let (get_in_port, get_requests) = process.sim_input();
43        let get_requests = get_requests.into_keyed();
44
45        let (inc_acks, get_responses) = single_counter_service_buggy(inc_requests, get_requests);
46
47        let inc_out_port = inc_acks.entries().sim_output();
48        let get_out_port = get_responses.entries().sim_output();
49
50        flow.sim().exhaustive(async || {
51            inc_in_port.send((1, ()));
52            inc_out_port.assert_yields_unordered([(1, ())]).await;
53            get_in_port.send((1, ()));
54            get_out_port.assert_yields_only_unordered([(1, 1)]).await;
55        });
56    }
57}