hydro_test/tutorials/
single_client_counter.rs1use hydro_lang::prelude::*;
2
3pub struct CounterServer;
4
5pub fn single_client_counter_service<'a>(
6 increment_requests: Stream<(), Process<'a, CounterServer>>,
7 get_requests: Stream<(), Process<'a, CounterServer>>,
8) -> (
9 Stream<(), Process<'a, CounterServer>>, Stream<usize, Process<'a, CounterServer>>, ) {
12 let increment_request_processing = increment_requests.atomic();
13 let current_count = increment_request_processing.clone().count();
14 let increment_ack = increment_request_processing.end_atomic();
15
16 let get_response = sliced! {
17 let request_batch = use(get_requests, nondet!());
18 let count_snapshot = use::atomic(current_count, nondet!());
19
20 request_batch.cross_singleton(count_snapshot).map(q!(|(_, count)| count))
21 };
22
23 (increment_ack, get_response)
24}
25
26#[cfg(test)]
27mod tests {
28 use hydro_lang::prelude::*;
29
30 use super::*;
31
32 #[test]
33 fn test_counter_read_after_write() {
34 let mut flow = FlowBuilder::new();
35 let process = flow.process();
36
37 let (inc_in_port, inc_requests) = process.sim_input();
38 let (get_in_port, get_requests) = process.sim_input();
39
40 let (inc_acks, get_responses) = single_client_counter_service(inc_requests, get_requests);
41
42 let inc_out_port = inc_acks.sim_output();
43 let get_out_port = get_responses.sim_output();
44
45 flow.sim().exhaustive(async || {
46 inc_in_port.send(());
47 inc_out_port.assert_yields([()]).await;
48 get_in_port.send(());
49 get_out_port.assert_yields_only([1]).await;
50 });
51 }
52}