Skip to main content

hydro_test/tutorials/
partitioned_counter.rs

1use std::hash::{DefaultHasher, Hash, Hasher};
2
3use hydro_lang::live_collections::stream::NoOrder;
4use hydro_lang::location::MemberId;
5use hydro_lang::prelude::*;
6
7pub struct CounterServer;
8pub struct CounterShard;
9
10#[expect(clippy::type_complexity, reason = "output types with orderings")]
11pub fn sharded_counter_service<'a>(
12    leader: &Process<'a, CounterServer>,
13    shard_servers: &Cluster<'a, CounterShard>,
14    increment_requests: KeyedStream<u32, String, Process<'a, CounterServer>>,
15    get_requests: KeyedStream<u32, String, Process<'a, CounterServer>>,
16) -> (
17    KeyedStream<u32, String, Process<'a, CounterServer>, Unbounded, NoOrder>,
18    KeyedStream<u32, (String, usize), Process<'a, CounterServer>, Unbounded, NoOrder>,
19) {
20    let sharded_increment_requests = increment_requests
21        .prefix_key(q!(|(_client, key)| {
22            let mut hasher = DefaultHasher::new();
23            key.hash(&mut hasher);
24            MemberId::from_raw_id(hasher.finish() as u32 % 5)
25        }))
26        .demux(shard_servers, TCP.fail_stop().bincode());
27
28    let sharded_get_requests = get_requests
29        .prefix_key(q!(|(_client, key)| {
30            let mut hasher = DefaultHasher::new();
31            key.hash(&mut hasher);
32            MemberId::from_raw_id(hasher.finish() as u32 % 5)
33        }))
34        .demux(shard_servers, TCP.fail_stop().bincode());
35
36    let (sharded_increment_ack, sharded_get_response) = super::keyed_counter::keyed_counter_service(
37        sharded_increment_requests,
38        sharded_get_requests,
39    );
40
41    let increment_ack = sharded_increment_ack
42        .send(leader, TCP.fail_stop().bincode())
43        .drop_key_prefix();
44
45    let get_response = sharded_get_response
46        .send(leader, TCP.fail_stop().bincode())
47        .drop_key_prefix();
48
49    (increment_ack, get_response)
50}
51
52#[cfg(test)]
53mod tests {
54    use hydro_lang::prelude::*;
55
56    use super::*;
57
58    #[test]
59    fn test_counter_read_after_write() {
60        let mut flow = FlowBuilder::new();
61        let process = flow.process();
62        let shards = flow.cluster();
63
64        let (inc_in_port, inc_requests) = process.sim_input();
65        let inc_requests = inc_requests.into_keyed();
66
67        let (get_in_port, get_requests) = process.sim_input();
68        let get_requests = get_requests.into_keyed();
69
70        let (inc_acks, get_responses) =
71            sharded_counter_service(&process, &shards, inc_requests, get_requests);
72
73        let inc_out_port = inc_acks.entries().sim_output();
74        let get_out_port = get_responses.entries().sim_output();
75
76        flow.sim()
77            .with_cluster_size(&shards, 5)
78            .exhaustive(async || {
79                inc_in_port.send((1, "abc".to_owned()));
80                inc_out_port
81                    .assert_yields_unordered([(1, "abc".to_owned())])
82                    .await;
83                get_in_port.send((1, "abc".to_owned()));
84                get_out_port
85                    .assert_yields_only_unordered([(1, ("abc".to_owned(), 1))])
86                    .await;
87            });
88    }
89}