Skip to main content

hydro_test/tutorials/
keyed_counter_non_atomic.rs

1use hydro_lang::live_collections::stream::{NoOrder, Ordering};
2use hydro_lang::location::{Location, NoTick};
3use hydro_lang::prelude::*;
4
5pub struct CounterServer;
6
7#[expect(clippy::type_complexity, reason = "output types with orderings")]
8pub fn keyed_counter_service_buggy<'a, L: Location<'a> + NoTick, O: Ordering>(
9    increment_requests: KeyedStream<u32, String, L, Unbounded, O>,
10    get_requests: KeyedStream<u32, String, L, Unbounded, O>,
11) -> (
12    KeyedStream<u32, String, L, Unbounded, O>,
13    KeyedStream<u32, (String, usize), L, Unbounded, NoOrder>,
14) {
15    let current_count = increment_requests
16        .clone()
17        .entries()
18        .map(q!(|(_, key)| (key, ())))
19        .into_keyed()
20        .value_counts();
21    let increment_ack = increment_requests;
22
23    let requests_regrouped = get_requests
24        .entries()
25        .map(q!(|(cid, key)| (key, cid)))
26        .into_keyed();
27
28    let get_lookup = sliced! {
29        let request_batch = use(requests_regrouped, nondet!(/** we never observe batch boundaries */));
30        let count_snapshot = use(current_count, nondet!(/** atomicity guarantees consistency wrt increments */));
31
32        request_batch.join_keyed_singleton(count_snapshot)
33    };
34
35    let get_response = get_lookup
36        .entries()
37        .map(q!(|(key, (client, count))| (client, (key, count))))
38        .into_keyed();
39
40    (increment_ack, get_response)
41}
42
43#[cfg(test)]
44mod tests {
45    use hydro_lang::prelude::*;
46
47    use super::*;
48
49    #[test]
50    #[should_panic]
51    fn test_counter_read_after_write_buggy() {
52        let mut flow = FlowBuilder::new();
53        let process = flow.process::<CounterServer>();
54
55        let (inc_in_port, inc_requests) = process.sim_input();
56        let inc_requests = inc_requests.into_keyed();
57
58        let (get_in_port, get_requests) = process.sim_input();
59        let get_requests = get_requests.into_keyed();
60
61        let (inc_acks, get_responses) = keyed_counter_service_buggy(inc_requests, get_requests);
62
63        let inc_out_port = inc_acks.entries().sim_output();
64        let get_out_port = get_responses.entries().sim_output();
65
66        flow.sim().exhaustive(async || {
67            inc_in_port.send((1, "abc".to_owned()));
68            inc_out_port
69                .assert_yields_unordered([(1, "abc".to_owned())])
70                .await;
71            get_in_port.send((1, "abc".to_owned()));
72            get_out_port
73                .assert_yields_only_unordered([(1, ("abc".to_owned(), 1))])
74                .await;
75        });
76    }
77
78    #[test]
79    fn test_counter_read_after_write_instances() {
80        let mut flow = FlowBuilder::new();
81        let process = flow.process::<CounterServer>();
82
83        let (inc_in_port, inc_requests) = process.sim_input();
84        let inc_requests = inc_requests.into_keyed();
85
86        let (get_in_port, get_requests) = process.sim_input();
87        let get_requests = get_requests.into_keyed();
88
89        let (inc_acks, get_responses) = keyed_counter_service_buggy(inc_requests, get_requests);
90
91        let inc_out_port = inc_acks.entries().sim_output();
92        let get_out_port = get_responses.entries().sim_output();
93
94        let instances = flow.sim().exhaustive(async || {
95            inc_in_port.send((1, "abc".to_owned()));
96            inc_out_port
97                .assert_yields_unordered([(1, "abc".to_owned())])
98                .await;
99            get_in_port.send((1, "abc".to_owned()));
100            let _ = get_out_port.collect_sorted::<Vec<_>>().await;
101        });
102
103        assert_eq!(instances, 3);
104    }
105}