Skip to main content

hydro_test/tutorials/
single_client_counter.rs

1use hydro_lang::prelude::*;
2
3pub struct CounterServer;
4
5pub fn single_client_counter_service<'a>(
6    increment_requests: Stream<(), Process<'a, CounterServer>>,
7    get_requests: Stream<(), Process<'a, CounterServer>>,
8) -> (
9    Stream<(), Process<'a, CounterServer>>, // increment acknowledgments
10    Stream<usize, Process<'a, CounterServer>>, // get responses
11) {
12    let increment_request_processing = increment_requests.atomic();
13    let current_count = increment_request_processing.clone().count();
14    let increment_ack = increment_request_processing.end_atomic();
15
16    let get_response = sliced! {
17        let request_batch = use(get_requests, nondet!(/** we never observe batch boundaries */));
18        let count_snapshot = use::atomic(current_count, nondet!(/** atomicity guarantees consistency wrt increments */));
19
20        request_batch.cross_singleton(count_snapshot).map(q!(|(_, count)| count))
21    };
22
23    (increment_ack, get_response)
24}
25
26#[cfg(test)]
27mod tests {
28    use hydro_lang::prelude::*;
29
30    use super::*;
31
32    #[test]
33    fn test_counter_read_after_write() {
34        let mut flow = FlowBuilder::new();
35        let process = flow.process();
36
37        let (inc_in_port, inc_requests) = process.sim_input();
38        let (get_in_port, get_requests) = process.sim_input();
39
40        let (inc_acks, get_responses) = single_client_counter_service(inc_requests, get_requests);
41
42        let inc_out_port = inc_acks.sim_output();
43        let get_out_port = get_responses.sim_output();
44
45        flow.sim().exhaustive(async || {
46            inc_in_port.send(());
47            inc_out_port.assert_yields([()]).await;
48            get_in_port.send(());
49            get_out_port.assert_yields_only([1]).await;
50        });
51    }
52}