hydro_test/tutorials/
keyed_counter.rs1use hydro_lang::live_collections::stream::NoOrder;
2use hydro_lang::location::{Location, NoTick};
3use hydro_lang::prelude::*;
4
5pub struct CounterServer;
6
7#[expect(clippy::type_complexity, reason = "output types with orderings")]
8pub fn keyed_counter_service<'a, L: Location<'a> + NoTick>(
9 increment_requests: KeyedStream<u32, String, L, Unbounded>,
10 get_requests: KeyedStream<u32, String, L, Unbounded>,
11) -> (
12 KeyedStream<u32, String, L, Unbounded>,
13 KeyedStream<u32, (String, usize), L, Unbounded, NoOrder>,
14) {
15 let increment_request_processing = increment_requests.atomic();
16 let current_count = increment_request_processing
17 .clone()
18 .entries()
19 .map(q!(|(_, key)| (key, ())))
20 .into_keyed()
21 .value_counts();
22 let increment_ack = increment_request_processing.end_atomic();
23
24 let requests_regrouped = get_requests
25 .entries()
26 .map(q!(|(cid, key)| (key, cid)))
27 .into_keyed();
28
29 let get_lookup = sliced! {
30 let request_batch = use(requests_regrouped, nondet!());
31 let count_snapshot = use::atomic(current_count, nondet!());
32
33 request_batch.join_keyed_singleton(count_snapshot)
34 };
35
36 let get_response = get_lookup
37 .entries()
38 .map(q!(|(key, (client, count))| (client, (key, count))))
39 .into_keyed();
40
41 (increment_ack, get_response)
42}
43
44#[cfg(test)]
45mod tests {
46 use hydro_lang::prelude::*;
47
48 use super::*;
49
50 #[test]
51 fn test_counter_read_after_write() {
52 let mut flow = FlowBuilder::new();
53 let process = flow.process::<CounterServer>();
54
55 let (inc_in_port, inc_requests) = process.sim_input();
56 let inc_requests = inc_requests.into_keyed();
57
58 let (get_in_port, get_requests) = process.sim_input();
59 let get_requests = get_requests.into_keyed();
60
61 let (inc_acks, get_responses) = keyed_counter_service(inc_requests, get_requests);
62
63 let inc_out_port = inc_acks.entries().sim_output();
64 let get_out_port = get_responses.entries().sim_output();
65
66 flow.sim().exhaustive(async || {
67 inc_in_port.send((1, "abc".to_owned()));
68 inc_out_port
69 .assert_yields_unordered([(1, "abc".to_owned())])
70 .await;
71 get_in_port.send((1, "abc".to_owned()));
72 get_out_port
73 .assert_yields_only_unordered([(1, ("abc".to_owned(), 1))])
74 .await;
75 });
76 }
77}