hydro_test/tutorials/
keyed_counter_non_atomic.rs1use hydro_lang::live_collections::stream::{NoOrder, Ordering};
2use hydro_lang::location::{Location, NoTick};
3use hydro_lang::prelude::*;
4
5pub struct CounterServer;
6
7#[expect(clippy::type_complexity, reason = "output types with orderings")]
8pub fn keyed_counter_service_buggy<'a, L: Location<'a> + NoTick, O: Ordering>(
9 increment_requests: KeyedStream<u32, String, L, Unbounded, O>,
10 get_requests: KeyedStream<u32, String, L, Unbounded, O>,
11) -> (
12 KeyedStream<u32, String, L, Unbounded, O>,
13 KeyedStream<u32, (String, usize), L, Unbounded, NoOrder>,
14) {
15 let current_count = increment_requests
16 .clone()
17 .entries()
18 .map(q!(|(_, key)| (key, ())))
19 .into_keyed()
20 .value_counts();
21 let increment_ack = increment_requests;
22
23 let requests_regrouped = get_requests
24 .entries()
25 .map(q!(|(cid, key)| (key, cid)))
26 .into_keyed();
27
28 let get_lookup = sliced! {
29 let request_batch = use(requests_regrouped, nondet!());
30 let count_snapshot = use(current_count, nondet!());
31
32 request_batch.join_keyed_singleton(count_snapshot)
33 };
34
35 let get_response = get_lookup
36 .entries()
37 .map(q!(|(key, (client, count))| (client, (key, count))))
38 .into_keyed();
39
40 (increment_ack, get_response)
41}
42
43#[cfg(test)]
44mod tests {
45 use hydro_lang::prelude::*;
46
47 use super::*;
48
49 #[test]
50 #[should_panic]
51 fn test_counter_read_after_write_buggy() {
52 let mut flow = FlowBuilder::new();
53 let process = flow.process::<CounterServer>();
54
55 let (inc_in_port, inc_requests) = process.sim_input();
56 let inc_requests = inc_requests.into_keyed();
57
58 let (get_in_port, get_requests) = process.sim_input();
59 let get_requests = get_requests.into_keyed();
60
61 let (inc_acks, get_responses) = keyed_counter_service_buggy(inc_requests, get_requests);
62
63 let inc_out_port = inc_acks.entries().sim_output();
64 let get_out_port = get_responses.entries().sim_output();
65
66 flow.sim().exhaustive(async || {
67 inc_in_port.send((1, "abc".to_owned()));
68 inc_out_port
69 .assert_yields_unordered([(1, "abc".to_owned())])
70 .await;
71 get_in_port.send((1, "abc".to_owned()));
72 get_out_port
73 .assert_yields_only_unordered([(1, ("abc".to_owned(), 1))])
74 .await;
75 });
76 }
77
78 #[test]
79 fn test_counter_read_after_write_instances() {
80 let mut flow = FlowBuilder::new();
81 let process = flow.process::<CounterServer>();
82
83 let (inc_in_port, inc_requests) = process.sim_input();
84 let inc_requests = inc_requests.into_keyed();
85
86 let (get_in_port, get_requests) = process.sim_input();
87 let get_requests = get_requests.into_keyed();
88
89 let (inc_acks, get_responses) = keyed_counter_service_buggy(inc_requests, get_requests);
90
91 let inc_out_port = inc_acks.entries().sim_output();
92 let get_out_port = get_responses.entries().sim_output();
93
94 let instances = flow.sim().exhaustive(async || {
95 inc_in_port.send((1, "abc".to_owned()));
96 inc_out_port
97 .assert_yields_unordered([(1, "abc".to_owned())])
98 .await;
99 get_in_port.send((1, "abc".to_owned()));
100 let _ = get_out_port.collect_sorted::<Vec<_>>().await;
101 });
102
103 assert_eq!(instances, 3);
104 }
105}