hydro_test/tutorials/
single_counter_buggy.rs1use hydro_lang::prelude::*;
2
3pub struct CounterServer;
4
5#[expect(clippy::type_complexity, reason = "multiple outputs")]
7pub fn single_counter_service_buggy<'a>(
8 increment_requests: KeyedStream<u32, (), Process<'a, CounterServer>>,
9 get_requests: KeyedStream<u32, (), Process<'a, CounterServer>>,
10) -> (
11 KeyedStream<u32, (), Process<'a, CounterServer>>,
12 KeyedStream<u32, usize, Process<'a, CounterServer>>,
13) {
14 let current_count = increment_requests.clone().values().count();
15 let increment_ack = increment_requests;
16
17 let get_response = sliced! {
18 let request_batch = use(get_requests, nondet!());
19 let count_snapshot = use(current_count, nondet!());
20
21 request_batch.cross_singleton(count_snapshot).map(q!(|(_, count)| count))
22 };
23
24 (increment_ack, get_response)
25}
26
27#[cfg(test)]
28mod tests {
29 use hydro_lang::prelude::*;
30
31 use super::*;
32
33 #[test]
34 #[should_panic]
35 fn test_buggy_counter_read_after_write() {
36 let mut flow = FlowBuilder::new();
37 let process = flow.process();
38
39 let (inc_in_port, inc_requests) = process.sim_input();
40 let inc_requests = inc_requests.into_keyed();
41
42 let (get_in_port, get_requests) = process.sim_input();
43 let get_requests = get_requests.into_keyed();
44
45 let (inc_acks, get_responses) = single_counter_service_buggy(inc_requests, get_requests);
46
47 let inc_out_port = inc_acks.entries().sim_output();
48 let get_out_port = get_responses.entries().sim_output();
49
50 flow.sim().exhaustive(async || {
51 inc_in_port.send((1, ()));
52 inc_out_port.assert_yields_unordered([(1, ())]).await;
53 get_in_port.send((1, ()));
54 get_out_port.assert_yields_only_unordered([(1, 1)]).await;
55 });
56 }
57}