1use std::hash::Hash;
2
3use hydro_lang::*;
4use location::NoTick;
5
6#[expect(clippy::type_complexity, reason = "stream types with ordering")]
7pub fn collect_quorum_with_response<
8 'a,
9 L: Location<'a> + NoTick,
10 Order,
11 K: Clone + Eq + Hash,
12 V: Clone,
13 E: Clone,
14>(
15 responses: Stream<(K, Result<V, E>), Atomic<L>, Unbounded, Order>,
16 min: usize,
17 max: usize,
18) -> (
19 Stream<(K, V), Atomic<L>, Unbounded, Order>,
20 Stream<(K, E), Atomic<L>, Unbounded, Order>,
21) {
22 let tick = responses.atomic_source();
23 let (not_all_complete_cycle, not_all) = tick.cycle::<Stream<_, _, _, Order>>();
24
25 let current_responses = not_all.chain(unsafe {
26 responses.clone().tick_batch()
29 });
30
31 let count_per_key = current_responses.clone().fold_keyed_commutative(
32 q!(move || (0, 0)),
33 q!(move |accum, value| {
34 if value.is_ok() {
35 accum.0 += 1;
36 } else {
37 accum.1 += 1;
38 }
39 }),
40 );
41
42 let not_reached_min_count =
43 count_per_key
44 .clone()
45 .filter_map(q!(move |(key, (success, _error))| if success < min {
46 Some(key)
47 } else {
48 None
49 }));
50
51 let reached_min_count =
52 count_per_key
53 .clone()
54 .filter_map(q!(move |(key, (success, _error))| if success >= min {
55 Some(key)
56 } else {
57 None
58 }));
59
60 let just_reached_quorum = if max == min {
61 not_all_complete_cycle
62 .complete_next_tick(current_responses.clone().anti_join(reached_min_count));
63
64 current_responses.anti_join(not_reached_min_count)
65 } else {
66 let (min_but_not_max_complete_cycle, min_but_not_max) = tick.cycle();
67
68 let received_from_all =
69 count_per_key.filter_map(q!(
70 move |(key, (success, error))| if (success + error) >= max {
71 Some(key)
72 } else {
73 None
74 }
75 ));
76
77 min_but_not_max_complete_cycle
78 .complete_next_tick(reached_min_count.filter_not_in(received_from_all.clone()));
79
80 not_all_complete_cycle
81 .complete_next_tick(current_responses.clone().anti_join(received_from_all));
82
83 current_responses
84 .anti_join(not_reached_min_count)
85 .anti_join(min_but_not_max)
86 };
87
88 (
89 just_reached_quorum
90 .filter_map(q!(move |(key, res)| match res {
91 Ok(v) => Some((key, v)),
92 Err(_) => None,
93 }))
94 .all_ticks_atomic(),
95 responses.filter_map(q!(move |(key, res)| match res {
96 Ok(_) => None,
97 Err(e) => Some((key, e)),
98 })),
99 )
100}
101
102#[expect(clippy::type_complexity, reason = "stream types with ordering")]
103pub fn collect_quorum<'a, L: Location<'a> + NoTick, Order, K: Clone + Eq + Hash, E: Clone>(
104 responses: Stream<(K, Result<(), E>), Atomic<L>, Unbounded, Order>,
105 min: usize,
106 max: usize,
107) -> (
108 Stream<K, Atomic<L>, Unbounded, Order>,
109 Stream<(K, E), Atomic<L>, Unbounded, Order>,
110) {
111 let tick = responses.atomic_source();
112 let (not_all_complete_cycle, not_all) = tick.cycle::<Stream<_, _, _, Order>>();
113
114 let current_responses = not_all.chain(unsafe {
115 responses.clone().tick_batch()
118 });
119
120 let count_per_key = current_responses.clone().fold_keyed_commutative(
121 q!(move || (0, 0)),
122 q!(move |accum, value| {
123 if value.is_ok() {
124 accum.0 += 1;
125 } else {
126 accum.1 += 1;
127 }
128 }),
129 );
130
131 let reached_min_count =
132 count_per_key
133 .clone()
134 .filter_map(q!(move |(key, (success, _error))| if success >= min {
135 Some(key)
136 } else {
137 None
138 }));
139
140 let just_reached_quorum = if max == min {
141 not_all_complete_cycle.complete_next_tick(
142 current_responses
143 .clone()
144 .anti_join(reached_min_count.clone()),
145 );
146
147 reached_min_count
148 } else {
149 let (min_but_not_max_complete_cycle, min_but_not_max) = tick.cycle();
150
151 let received_from_all =
152 count_per_key.filter_map(q!(
153 move |(key, (success, error))| if (success + error) >= max {
154 Some(key)
155 } else {
156 None
157 }
158 ));
159
160 min_but_not_max_complete_cycle.complete_next_tick(
161 reached_min_count
162 .clone()
163 .filter_not_in(received_from_all.clone()),
164 );
165
166 not_all_complete_cycle.complete_next_tick(current_responses.anti_join(received_from_all));
167
168 reached_min_count.filter_not_in(min_but_not_max)
169 };
170
171 (
172 just_reached_quorum.all_ticks_atomic(),
173 responses.filter_map(q!(move |(key, res)| match res {
174 Ok(_) => None,
175 Err(e) => Some((key, e)),
176 })),
177 )
178}