1use std::hash::Hash;
2
3use hydro_lang::*;
4use location::NoTick;
5
6#[expect(clippy::type_complexity, reason = "stream types with ordering")]
7pub fn collect_quorum_with_response<
8 'a,
9 L: Location<'a> + NoTick,
10 Order,
11 K: Clone + Eq + Hash,
12 V: Clone,
13 E: Clone,
14>(
15 responses: Stream<(K, Result<V, E>), Atomic<L>, Unbounded, Order>,
16 min: usize,
17 max: usize,
18) -> (
19 Stream<(K, V), Atomic<L>, Unbounded, Order>,
20 Stream<(K, E), Atomic<L>, Unbounded, Order>,
21) {
22 let tick = responses.atomic_source();
23 let (not_all_complete_cycle, not_all) = tick.cycle::<Stream<_, _, _, Order>>();
24
25 let current_responses = not_all.chain(responses.clone().batch(nondet!(
26 )));
29
30 let count_per_key = current_responses.clone().into_keyed().fold_commutative(
31 q!(move || (0, 0)),
32 q!(move |accum, value| {
33 if value.is_ok() {
34 accum.0 += 1;
35 } else {
36 accum.1 += 1;
37 }
38 }),
39 );
40
41 let not_reached_min_count = count_per_key
42 .clone()
43 .filter(q!(move |(success, _error)| success < &min))
44 .keys();
45
46 let reached_min_count = count_per_key
47 .clone()
48 .filter(q!(move |(success, _error)| success >= &min))
49 .keys();
50
51 let just_reached_quorum = if max == min {
52 not_all_complete_cycle
53 .complete_next_tick(current_responses.clone().anti_join(reached_min_count));
54
55 current_responses.anti_join(not_reached_min_count)
56 } else {
57 let (min_but_not_max_complete_cycle, min_but_not_max) = tick.cycle();
58
59 let received_from_all = count_per_key
60 .filter(q!(move |(success, error)| (success + error) >= max))
61 .keys();
62
63 min_but_not_max_complete_cycle
64 .complete_next_tick(reached_min_count.filter_not_in(received_from_all.clone()));
65
66 not_all_complete_cycle
67 .complete_next_tick(current_responses.clone().anti_join(received_from_all));
68
69 current_responses
70 .anti_join(not_reached_min_count)
71 .anti_join(min_but_not_max)
72 };
73
74 (
75 just_reached_quorum
76 .filter_map(q!(move |(key, res)| match res {
77 Ok(v) => Some((key, v)),
78 Err(_) => None,
79 }))
80 .all_ticks_atomic(),
81 responses.filter_map(q!(move |(key, res)| match res {
82 Ok(_) => None,
83 Err(e) => Some((key, e)),
84 })),
85 )
86}
87
88#[expect(clippy::type_complexity, reason = "stream types with ordering")]
89pub fn collect_quorum<'a, L: Location<'a> + NoTick, Order, K: Clone + Eq + Hash, E: Clone>(
90 responses: Stream<(K, Result<(), E>), Atomic<L>, Unbounded, Order>,
91 min: usize,
92 max: usize,
93) -> (
94 Stream<K, Atomic<L>, Unbounded, NoOrder>,
95 Stream<(K, E), Atomic<L>, Unbounded, Order>,
96) {
97 let tick = responses.atomic_source();
98 let (not_all_complete_cycle, not_all) = tick.cycle::<Stream<_, _, _, Order>>();
99
100 let current_responses = not_all.chain(responses.clone().batch(nondet!(
101 )));
104
105 let count_per_key = current_responses.clone().into_keyed().fold_commutative(
106 q!(move || (0, 0)),
107 q!(move |accum, value| {
108 if value.is_ok() {
109 accum.0 += 1;
110 } else {
111 accum.1 += 1;
112 }
113 }),
114 );
115
116 let reached_min_count =
117 count_per_key
118 .clone()
119 .entries()
120 .filter_map(q!(move |(key, (success, _error))| if success >= min {
121 Some(key)
122 } else {
123 None
124 }));
125
126 let just_reached_quorum = if max == min {
127 not_all_complete_cycle.complete_next_tick(
128 current_responses
129 .clone()
130 .anti_join(reached_min_count.clone()),
131 );
132
133 reached_min_count
134 } else {
135 let (min_but_not_max_complete_cycle, min_but_not_max) = tick.cycle();
136
137 let received_from_all = count_per_key
138 .filter(q!(move |(success, error)| (success + error) >= max))
139 .keys();
140
141 min_but_not_max_complete_cycle.complete_next_tick(
142 reached_min_count
143 .clone()
144 .filter_not_in(received_from_all.clone()),
145 );
146
147 not_all_complete_cycle.complete_next_tick(current_responses.anti_join(received_from_all));
148
149 reached_min_count.filter_not_in(min_but_not_max)
150 };
151
152 (
153 just_reached_quorum.all_ticks_atomic(),
154 responses.filter_map(q!(move |(key, res)| match res {
155 Ok(_) => None,
156 Err(e) => Some((key, e)),
157 })),
158 )
159}