Skip to main content

hydro_std/
quorum.rs

1use std::hash::Hash;
2
3use hydro_lang::live_collections::stream::{NoOrder, Ordering};
4use hydro_lang::location::{Location, NoTick};
5use hydro_lang::prelude::*;
6
7#[expect(clippy::type_complexity, reason = "stream types with ordering")]
8pub fn collect_quorum_with_response<
9    'a,
10    L: Location<'a> + NoTick,
11    Order: Ordering,
12    K: Clone + Eq + Hash,
13    V: Clone,
14    E: Clone,
15>(
16    responses: Stream<(K, Result<V, E>), L, Unbounded, Order>,
17    min: usize,
18    max: usize,
19) -> (
20    Stream<(K, V), L, Unbounded, Order>,
21    Stream<(K, E), L, Unbounded, Order>,
22) {
23    let quorums = sliced! {
24        let new_inputs = use(responses.clone(), nondet!(
25            /// We always persist values that have not reached quorum, so even
26            /// with arbitrary batching we always produce deterministic quorum results.
27        ));
28
29        let mut not_all = use::state_null::<Stream<_, _, Bounded, Order>>();
30        let mut min_but_not_max = use::state_null::<Stream<K, _, Bounded, NoOrder>>();
31
32        let current_responses = not_all.chain(new_inputs);
33
34        let count_per_key = current_responses.clone().into_keyed().fold(
35            q!(move || (0, 0)),
36            q!(move |accum, value| {
37                if value.is_ok() {
38                    accum.0 += 1;
39                } else {
40                    accum.1 += 1;
41                }
42            }, commutative = manual_proof!(/** increment counters is commutative */)),
43        );
44
45         let not_reached_min_count = count_per_key
46            .clone()
47            .filter(q!(move |(success, _error)| success < &min))
48            .keys();
49
50        let reached_min_count = count_per_key
51            .clone()
52            .filter(q!(move |(success, _error)| success >= &min))
53            .keys();
54
55        let just_reached_quorum = if max == min {
56            not_all = current_responses.clone().anti_join(reached_min_count);
57
58            current_responses.anti_join(not_reached_min_count)
59        } else {
60            let received_from_all = count_per_key
61                .filter(q!(move |(success, error)| (success + error) >= max))
62                .keys();
63
64            not_all = current_responses.clone().anti_join(received_from_all.clone());
65
66            let out = current_responses
67                .anti_join(not_reached_min_count)
68                .anti_join(min_but_not_max);
69
70            min_but_not_max = reached_min_count.filter_not_in(received_from_all);
71
72            out
73        };
74
75        just_reached_quorum.filter_map(q!(move |(key, res)| match res {
76            Ok(v) => Some((key, v)),
77            Err(_) => None,
78        }))
79    };
80
81    (
82        quorums,
83        responses.filter_map(q!(move |(key, res)| match res {
84            Ok(_) => None,
85            Err(e) => Some((key, e)),
86        })),
87    )
88}
89
90#[expect(clippy::type_complexity, reason = "stream types with ordering")]
91pub fn collect_quorum<
92    'a,
93    L: Location<'a> + NoTick,
94    Order: Ordering,
95    K: Clone + Eq + Hash,
96    E: Clone,
97>(
98    responses: Stream<(K, Result<(), E>), L, Unbounded, Order>,
99    min: usize,
100    max: usize,
101) -> (
102    Stream<K, L, Unbounded, NoOrder>,
103    Stream<(K, E), L, Unbounded, Order>,
104) {
105    let just_reached_quorum = sliced! {
106        let new_inputs = use(responses.clone(), nondet!(
107            /// We always persist values that have not reached quorum, so even
108            /// with arbitrary batching we always produce deterministic quorum results.
109        ));
110
111        let mut not_all = use::state_null::<Stream<_, _, Bounded, Order>>();
112        let mut min_but_not_max = use::state_null::<Stream<K, _, Bounded, NoOrder>>();
113
114        let current_responses = not_all.chain(new_inputs);
115
116        let count_per_key = current_responses.clone().into_keyed().fold(
117            q!(move || (0, 0)),
118            q!(move |accum, value| {
119                if value.is_ok() {
120                    accum.0 += 1;
121                } else {
122                    accum.1 += 1;
123                }
124            }, commutative = manual_proof!(/** increment counters is commutative */)),
125        );
126
127        let reached_min_count = count_per_key
128            .clone()
129            .entries()
130            .filter_map(q!(move |(key, (success, _error))| if success >= min {
131                Some(key)
132            } else {
133                None
134            }));
135
136        let just_reached_quorum = if max == min {
137            not_all = current_responses.anti_join(reached_min_count.clone());
138
139            reached_min_count
140        } else {
141            let received_from_all = count_per_key
142                .filter(q!(move |(success, error)| (success + error) >= max))
143                .keys();
144
145            not_all = current_responses.anti_join(received_from_all.clone());
146
147            let out = reached_min_count.clone().filter_not_in(min_but_not_max);
148
149            min_but_not_max = reached_min_count.filter_not_in(received_from_all);
150
151            out
152        };
153
154        just_reached_quorum
155    };
156
157    (
158        just_reached_quorum,
159        responses.filter_map(q!(move |(key, res)| match res {
160            Ok(_) => None,
161            Err(e) => Some((key, e)),
162        })),
163    )
164}
165
166#[cfg(test)]
167mod tests {
168    use hydro_lang::live_collections::stream::{NoOrder, TotalOrder};
169    use hydro_lang::prelude::*;
170
171    use super::{collect_quorum, collect_quorum_with_response};
172
173    #[test]
174    fn collect_quorum_with_response_preserves_order() {
175        let mut flow = FlowBuilder::new();
176        let node = flow.process::<()>();
177
178        let (in_send, input) = node.sim_input();
179        let out_recv = collect_quorum_with_response(input, 3, 3).0.sim_output();
180
181        flow.sim().exhaustive(async || {
182            in_send.send((1, Ok::<(), ()>(())));
183            in_send.send((1, Ok(())));
184            in_send.send((1, Ok(())));
185            in_send.send((2, Ok(())));
186            in_send.send((2, Ok(())));
187            in_send.send((3, Ok(())));
188            in_send.send((3, Ok(())));
189            in_send.send((3, Ok(())));
190
191            assert_eq!(
192                out_recv.collect::<Vec<_>>().await,
193                vec![(1, ()), (1, ()), (1, ()), (3, ()), (3, ()), (3, ())]
194            )
195        });
196    }
197
198    #[test]
199    fn collect_quorum_with_response_no_order() {
200        let mut flow = FlowBuilder::new();
201        let node = flow.process::<()>();
202
203        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
204        let out_recv = collect_quorum_with_response(input, 2, 2).0.sim_output();
205
206        flow.sim().exhaustive(async || {
207            in_send.send_many_unordered([
208                (1, Ok::<(), ()>(())),
209                (1, Ok(())),
210                (2, Ok(())),
211                (3, Ok(())),
212                (3, Ok(())),
213            ]);
214
215            out_recv
216                .assert_yields_only_unordered([(1, ()), (1, ()), (3, ()), (3, ())])
217                .await;
218        });
219    }
220
221    #[test]
222    fn collect_quorum_functionality() {
223        let mut flow = FlowBuilder::new();
224        let node = flow.process::<()>();
225
226        let (in_send, input) = node.sim_input();
227        let (success_recv, error_recv) = {
228            let (success, error) = collect_quorum(input, 2, 3);
229            (success.sim_output(), error.sim_output())
230        };
231
232        let compiled_sim = flow.sim().compiled();
233
234        // Test case 1: Key reaches exact minimum quorum (2/3)
235        compiled_sim.exhaustive(async || {
236            in_send.send((1, Ok::<(), ()>(())));
237            in_send.send((1, Ok(())));
238
239            success_recv.assert_yields_only_unordered([1]).await;
240            error_recv.assert_no_more().await;
241        });
242
243        // Test case 2: Key reaches maximum responses with mixed results (2 success, 1 error)
244        compiled_sim.exhaustive(async || {
245            in_send.send((2, Ok::<(), ()>(())));
246            in_send.send((2, Ok(())));
247            in_send.send((2, Err(())));
248
249            success_recv.assert_yields_only_unordered([2]).await;
250            error_recv.assert_yields_only([(2, ())]).await;
251        });
252
253        // Test case 3: Key doesn't reach quorum (1 success, 2 errors)
254        compiled_sim.exhaustive(async || {
255            in_send.send((3, Ok::<(), ()>(())));
256            in_send.send((3, Err(())));
257            in_send.send((3, Err(())));
258
259            success_recv.assert_no_more().await;
260            error_recv.assert_yields_only([(3, ()), (3, ())]).await;
261        });
262
263        // Test case 4: Key reaches quorum with extra responses
264        compiled_sim.exhaustive(async || {
265            in_send.send((4, Ok::<(), ()>(())));
266            in_send.send((4, Ok(())));
267            in_send.send((4, Ok(()))); // This should be ignored after quorum
268
269            success_recv.assert_yields_only_unordered([4]).await;
270            error_recv.assert_no_more().await;
271        });
272
273        // Test case 5: Key with only errors (no quorum)
274        compiled_sim.exhaustive(async || {
275            in_send.send((5, Err::<(), ()>(())));
276            in_send.send((5, Err(())));
277            in_send.send((5, Err(())));
278
279            success_recv.assert_no_more().await;
280            error_recv
281                .assert_yields_only([(5, ()), (5, ()), (5, ())])
282                .await;
283        });
284
285        // Test case 6: Key that reaches quorum exactly at max (2 success, 1 error)
286        compiled_sim.exhaustive(async || {
287            in_send.send((6, Err::<(), ()>(())));
288            in_send.send((6, Ok(())));
289            in_send.send((6, Ok(())));
290
291            success_recv.assert_yields_only_unordered([6]).await;
292            error_recv.assert_yields_only([(6, ())]).await;
293        });
294    }
295
296    #[test]
297    fn collect_quorum_min_equals_max() {
298        let mut flow = FlowBuilder::new();
299        let node = flow.process::<()>();
300
301        let (in_send, input) = node.sim_input();
302        let success_recv = collect_quorum(input, 2, 2).0.sim_output();
303
304        flow.sim().exhaustive(async || {
305            // When min == max, we need exactly that many responses
306            in_send.send((1, Ok::<(), ()>(())));
307            in_send.send((1, Ok(())));
308
309            // This key gets exactly 2 responses (1 success, 1 error) - should not reach quorum
310            in_send.send((2, Ok(())));
311            in_send.send((2, Err(())));
312
313            // This key gets 2 successes - should reach quorum
314            in_send.send((3, Ok(())));
315            in_send.send((3, Ok(())));
316
317            // Only keys 1 and 3 should reach quorum (both have 2 successes)
318            success_recv.assert_yields_only_unordered([1, 3]).await;
319        });
320    }
321
322    #[test]
323    fn collect_quorum_single_response() {
324        let mut flow = FlowBuilder::new();
325        let node = flow.process::<()>();
326
327        let (in_send, input) = node.sim_input();
328        let success_recv = collect_quorum(input, 1, 1).0.sim_output();
329
330        flow.sim().exhaustive(async || {
331            // With min=max=1, any single success should immediately reach quorum
332            in_send.send((1, Ok::<(), ()>(())));
333            in_send.send((2, Err(())));
334            in_send.send((3, Ok(())));
335
336            // Keys 1 and 3 should reach quorum immediately
337            success_recv.assert_yields_only_unordered([1, 3]).await;
338        });
339    }
340
341    #[test]
342    fn collect_quorum_no_responses() {
343        let mut flow = FlowBuilder::new();
344        let node = flow.process::<()>();
345
346        let (_in_send, input) = node.sim_input::<_, TotalOrder, _>();
347        let success_recv = {
348            let (success, _error) = collect_quorum::<_, _, i32, ()>(input, 2, 3);
349            success.sim_output()
350        };
351
352        flow.sim().exhaustive(async || {
353            // No responses sent - should get empty results
354            success_recv.assert_no_more().await;
355        });
356    }
357
358    #[test]
359    fn collect_quorum_no_double_quorum_before_max() {
360        let mut flow = FlowBuilder::new();
361        let node = flow.process::<()>();
362
363        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
364        let success_recv = collect_quorum(input, 2, 4).0.sim_output();
365
366        flow.sim().exhaustive(async || {
367            // Key 1: First reaches quorum with 2 successes
368            in_send.send((1, Ok::<(), ()>(())));
369            in_send.send((1, Ok(())));
370
371            // Key 1: Additional responses after quorum - should not trigger quorum again
372            in_send.send((1, Ok(())));
373            in_send.send((1, Ok(())));
374
375            // Key 2: Reaches quorum later with mixed responses
376            in_send.send((2, Err(())));
377            in_send.send((2, Ok(())));
378            in_send.send((2, Ok(())));
379            in_send.send((2, Err(()))); // Additional error after quorum
380
381            // Each key should appear exactly once, even though they received
382            // additional responses after reaching quorum
383            success_recv.assert_yields_only_unordered([1, 2]).await;
384        });
385    }
386}