1use std::hash::Hash;
2
3use hydro_lang::live_collections::stream::{NoOrder, Ordering};
4use hydro_lang::location::{Location, NoTick};
5use hydro_lang::prelude::*;
6
7#[expect(clippy::type_complexity, reason = "stream types with ordering")]
8pub fn collect_quorum_with_response<
9 'a,
10 L: Location<'a> + NoTick,
11 Order: Ordering,
12 K: Clone + Eq + Hash,
13 V: Clone,
14 E: Clone,
15>(
16 responses: Stream<(K, Result<V, E>), L, Unbounded, Order>,
17 min: usize,
18 max: usize,
19) -> (
20 Stream<(K, V), L, Unbounded, Order>,
21 Stream<(K, E), L, Unbounded, Order>,
22) {
23 let quorums = sliced! {
24 let new_inputs = use(responses.clone(), nondet!(
25 ));
28
29 let mut not_all = use::state_null::<Stream<_, _, Bounded, Order>>();
30 let mut min_but_not_max = use::state_null::<Stream<K, _, Bounded, NoOrder>>();
31
32 let current_responses = not_all.chain(new_inputs);
33
34 let count_per_key = current_responses.clone().into_keyed().fold(
35 q!(move || (0, 0)),
36 q!(move |accum, value| {
37 if value.is_ok() {
38 accum.0 += 1;
39 } else {
40 accum.1 += 1;
41 }
42 }, commutative = manual_proof!()),
43 );
44
45 let not_reached_min_count = count_per_key
46 .clone()
47 .filter(q!(move |(success, _error)| success < &min))
48 .keys();
49
50 let reached_min_count = count_per_key
51 .clone()
52 .filter(q!(move |(success, _error)| success >= &min))
53 .keys();
54
55 let just_reached_quorum = if max == min {
56 not_all = current_responses.clone().anti_join(reached_min_count);
57
58 current_responses.anti_join(not_reached_min_count)
59 } else {
60 let received_from_all = count_per_key
61 .filter(q!(move |(success, error)| (success + error) >= max))
62 .keys();
63
64 not_all = current_responses.clone().anti_join(received_from_all.clone());
65
66 let out = current_responses
67 .anti_join(not_reached_min_count)
68 .anti_join(min_but_not_max);
69
70 min_but_not_max = reached_min_count.filter_not_in(received_from_all);
71
72 out
73 };
74
75 just_reached_quorum.filter_map(q!(move |(key, res)| match res {
76 Ok(v) => Some((key, v)),
77 Err(_) => None,
78 }))
79 };
80
81 (
82 quorums,
83 responses.filter_map(q!(move |(key, res)| match res {
84 Ok(_) => None,
85 Err(e) => Some((key, e)),
86 })),
87 )
88}
89
90#[expect(clippy::type_complexity, reason = "stream types with ordering")]
91pub fn collect_quorum<
92 'a,
93 L: Location<'a> + NoTick,
94 Order: Ordering,
95 K: Clone + Eq + Hash,
96 E: Clone,
97>(
98 responses: Stream<(K, Result<(), E>), L, Unbounded, Order>,
99 min: usize,
100 max: usize,
101) -> (
102 Stream<K, L, Unbounded, NoOrder>,
103 Stream<(K, E), L, Unbounded, Order>,
104) {
105 let just_reached_quorum = sliced! {
106 let new_inputs = use(responses.clone(), nondet!(
107 ));
110
111 let mut not_all = use::state_null::<Stream<_, _, Bounded, Order>>();
112 let mut min_but_not_max = use::state_null::<Stream<K, _, Bounded, NoOrder>>();
113
114 let current_responses = not_all.chain(new_inputs);
115
116 let count_per_key = current_responses.clone().into_keyed().fold(
117 q!(move || (0, 0)),
118 q!(move |accum, value| {
119 if value.is_ok() {
120 accum.0 += 1;
121 } else {
122 accum.1 += 1;
123 }
124 }, commutative = manual_proof!()),
125 );
126
127 let reached_min_count = count_per_key
128 .clone()
129 .entries()
130 .filter_map(q!(move |(key, (success, _error))| if success >= min {
131 Some(key)
132 } else {
133 None
134 }));
135
136 let just_reached_quorum = if max == min {
137 not_all = current_responses.anti_join(reached_min_count.clone());
138
139 reached_min_count
140 } else {
141 let received_from_all = count_per_key
142 .filter(q!(move |(success, error)| (success + error) >= max))
143 .keys();
144
145 not_all = current_responses.anti_join(received_from_all.clone());
146
147 let out = reached_min_count.clone().filter_not_in(min_but_not_max);
148
149 min_but_not_max = reached_min_count.filter_not_in(received_from_all);
150
151 out
152 };
153
154 just_reached_quorum
155 };
156
157 (
158 just_reached_quorum,
159 responses.filter_map(q!(move |(key, res)| match res {
160 Ok(_) => None,
161 Err(e) => Some((key, e)),
162 })),
163 )
164}
165
166#[cfg(test)]
167mod tests {
168 use hydro_lang::live_collections::stream::{NoOrder, TotalOrder};
169 use hydro_lang::prelude::*;
170
171 use super::{collect_quorum, collect_quorum_with_response};
172
173 #[test]
174 fn collect_quorum_with_response_preserves_order() {
175 let mut flow = FlowBuilder::new();
176 let node = flow.process::<()>();
177
178 let (in_send, input) = node.sim_input();
179 let out_recv = collect_quorum_with_response(input, 3, 3).0.sim_output();
180
181 flow.sim().exhaustive(async || {
182 in_send.send((1, Ok::<(), ()>(())));
183 in_send.send((1, Ok(())));
184 in_send.send((1, Ok(())));
185 in_send.send((2, Ok(())));
186 in_send.send((2, Ok(())));
187 in_send.send((3, Ok(())));
188 in_send.send((3, Ok(())));
189 in_send.send((3, Ok(())));
190
191 assert_eq!(
192 out_recv.collect::<Vec<_>>().await,
193 vec![(1, ()), (1, ()), (1, ()), (3, ()), (3, ()), (3, ())]
194 )
195 });
196 }
197
198 #[test]
199 fn collect_quorum_with_response_no_order() {
200 let mut flow = FlowBuilder::new();
201 let node = flow.process::<()>();
202
203 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
204 let out_recv = collect_quorum_with_response(input, 2, 2).0.sim_output();
205
206 flow.sim().exhaustive(async || {
207 in_send.send_many_unordered([
208 (1, Ok::<(), ()>(())),
209 (1, Ok(())),
210 (2, Ok(())),
211 (3, Ok(())),
212 (3, Ok(())),
213 ]);
214
215 out_recv
216 .assert_yields_only_unordered([(1, ()), (1, ()), (3, ()), (3, ())])
217 .await;
218 });
219 }
220
221 #[test]
222 fn collect_quorum_functionality() {
223 let mut flow = FlowBuilder::new();
224 let node = flow.process::<()>();
225
226 let (in_send, input) = node.sim_input();
227 let (success_recv, error_recv) = {
228 let (success, error) = collect_quorum(input, 2, 3);
229 (success.sim_output(), error.sim_output())
230 };
231
232 let compiled_sim = flow.sim().compiled();
233
234 compiled_sim.exhaustive(async || {
236 in_send.send((1, Ok::<(), ()>(())));
237 in_send.send((1, Ok(())));
238
239 success_recv.assert_yields_only_unordered([1]).await;
240 error_recv.assert_no_more().await;
241 });
242
243 compiled_sim.exhaustive(async || {
245 in_send.send((2, Ok::<(), ()>(())));
246 in_send.send((2, Ok(())));
247 in_send.send((2, Err(())));
248
249 success_recv.assert_yields_only_unordered([2]).await;
250 error_recv.assert_yields_only([(2, ())]).await;
251 });
252
253 compiled_sim.exhaustive(async || {
255 in_send.send((3, Ok::<(), ()>(())));
256 in_send.send((3, Err(())));
257 in_send.send((3, Err(())));
258
259 success_recv.assert_no_more().await;
260 error_recv.assert_yields_only([(3, ()), (3, ())]).await;
261 });
262
263 compiled_sim.exhaustive(async || {
265 in_send.send((4, Ok::<(), ()>(())));
266 in_send.send((4, Ok(())));
267 in_send.send((4, Ok(()))); success_recv.assert_yields_only_unordered([4]).await;
270 error_recv.assert_no_more().await;
271 });
272
273 compiled_sim.exhaustive(async || {
275 in_send.send((5, Err::<(), ()>(())));
276 in_send.send((5, Err(())));
277 in_send.send((5, Err(())));
278
279 success_recv.assert_no_more().await;
280 error_recv
281 .assert_yields_only([(5, ()), (5, ()), (5, ())])
282 .await;
283 });
284
285 compiled_sim.exhaustive(async || {
287 in_send.send((6, Err::<(), ()>(())));
288 in_send.send((6, Ok(())));
289 in_send.send((6, Ok(())));
290
291 success_recv.assert_yields_only_unordered([6]).await;
292 error_recv.assert_yields_only([(6, ())]).await;
293 });
294 }
295
296 #[test]
297 fn collect_quorum_min_equals_max() {
298 let mut flow = FlowBuilder::new();
299 let node = flow.process::<()>();
300
301 let (in_send, input) = node.sim_input();
302 let success_recv = collect_quorum(input, 2, 2).0.sim_output();
303
304 flow.sim().exhaustive(async || {
305 in_send.send((1, Ok::<(), ()>(())));
307 in_send.send((1, Ok(())));
308
309 in_send.send((2, Ok(())));
311 in_send.send((2, Err(())));
312
313 in_send.send((3, Ok(())));
315 in_send.send((3, Ok(())));
316
317 success_recv.assert_yields_only_unordered([1, 3]).await;
319 });
320 }
321
322 #[test]
323 fn collect_quorum_single_response() {
324 let mut flow = FlowBuilder::new();
325 let node = flow.process::<()>();
326
327 let (in_send, input) = node.sim_input();
328 let success_recv = collect_quorum(input, 1, 1).0.sim_output();
329
330 flow.sim().exhaustive(async || {
331 in_send.send((1, Ok::<(), ()>(())));
333 in_send.send((2, Err(())));
334 in_send.send((3, Ok(())));
335
336 success_recv.assert_yields_only_unordered([1, 3]).await;
338 });
339 }
340
341 #[test]
342 fn collect_quorum_no_responses() {
343 let mut flow = FlowBuilder::new();
344 let node = flow.process::<()>();
345
346 let (_in_send, input) = node.sim_input::<_, TotalOrder, _>();
347 let success_recv = {
348 let (success, _error) = collect_quorum::<_, _, i32, ()>(input, 2, 3);
349 success.sim_output()
350 };
351
352 flow.sim().exhaustive(async || {
353 success_recv.assert_no_more().await;
355 });
356 }
357
358 #[test]
359 fn collect_quorum_no_double_quorum_before_max() {
360 let mut flow = FlowBuilder::new();
361 let node = flow.process::<()>();
362
363 let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
364 let success_recv = collect_quorum(input, 2, 4).0.sim_output();
365
366 flow.sim().exhaustive(async || {
367 in_send.send((1, Ok::<(), ()>(())));
369 in_send.send((1, Ok(())));
370
371 in_send.send((1, Ok(())));
373 in_send.send((1, Ok(())));
374
375 in_send.send((2, Err(())));
377 in_send.send((2, Ok(())));
378 in_send.send((2, Ok(())));
379 in_send.send((2, Err(()))); success_recv.assert_yields_only_unordered([1, 2]).await;
384 });
385 }
386}