hydro_test/tutorials/
partitioned_counter.rs1use std::hash::{DefaultHasher, Hash, Hasher};
2
3use hydro_lang::live_collections::stream::NoOrder;
4use hydro_lang::location::MemberId;
5use hydro_lang::prelude::*;
6
7pub struct CounterServer;
8pub struct CounterShard;
9
10#[expect(clippy::type_complexity, reason = "output types with orderings")]
11pub fn sharded_counter_service<'a>(
12 leader: &Process<'a, CounterServer>,
13 shard_servers: &Cluster<'a, CounterShard>,
14 increment_requests: KeyedStream<u32, String, Process<'a, CounterServer>>,
15 get_requests: KeyedStream<u32, String, Process<'a, CounterServer>>,
16) -> (
17 KeyedStream<u32, String, Process<'a, CounterServer>, Unbounded, NoOrder>,
18 KeyedStream<u32, (String, usize), Process<'a, CounterServer>, Unbounded, NoOrder>,
19) {
20 let sharded_increment_requests = increment_requests
21 .prefix_key(q!(|(_client, key)| {
22 let mut hasher = DefaultHasher::new();
23 key.hash(&mut hasher);
24 MemberId::from_raw_id(hasher.finish() as u32 % 5)
25 }))
26 .demux(shard_servers, TCP.fail_stop().bincode());
27
28 let sharded_get_requests = get_requests
29 .prefix_key(q!(|(_client, key)| {
30 let mut hasher = DefaultHasher::new();
31 key.hash(&mut hasher);
32 MemberId::from_raw_id(hasher.finish() as u32 % 5)
33 }))
34 .demux(shard_servers, TCP.fail_stop().bincode());
35
36 let (sharded_increment_ack, sharded_get_response) = super::keyed_counter::keyed_counter_service(
37 sharded_increment_requests,
38 sharded_get_requests,
39 );
40
41 let increment_ack = sharded_increment_ack
42 .send(leader, TCP.fail_stop().bincode())
43 .drop_key_prefix();
44
45 let get_response = sharded_get_response
46 .send(leader, TCP.fail_stop().bincode())
47 .drop_key_prefix();
48
49 (increment_ack, get_response)
50}
51
52#[cfg(test)]
53mod tests {
54 use hydro_lang::prelude::*;
55
56 use super::*;
57
58 #[test]
59 fn test_counter_read_after_write() {
60 let mut flow = FlowBuilder::new();
61 let process = flow.process();
62 let shards = flow.cluster();
63
64 let (inc_in_port, inc_requests) = process.sim_input();
65 let inc_requests = inc_requests.into_keyed();
66
67 let (get_in_port, get_requests) = process.sim_input();
68 let get_requests = get_requests.into_keyed();
69
70 let (inc_acks, get_responses) =
71 sharded_counter_service(&process, &shards, inc_requests, get_requests);
72
73 let inc_out_port = inc_acks.entries().sim_output();
74 let get_out_port = get_responses.entries().sim_output();
75
76 flow.sim()
77 .with_cluster_size(&shards, 5)
78 .exhaustive(async || {
79 inc_in_port.send((1, "abc".to_owned()));
80 inc_out_port
81 .assert_yields_unordered([(1, "abc".to_owned())])
82 .await;
83 get_in_port.send((1, "abc".to_owned()));
84 get_out_port
85 .assert_yields_only_unordered([(1, ("abc".to_owned(), 1))])
86 .await;
87 });
88 }
89}