hydro_std/
quorum.rs

1use std::hash::Hash;
2
3use hydro_lang::*;
4use location::NoTick;
5
6#[expect(clippy::type_complexity, reason = "stream types with ordering")]
7pub fn collect_quorum_with_response<
8    'a,
9    L: Location<'a> + NoTick,
10    Order,
11    K: Clone + Eq + Hash,
12    V: Clone,
13    E: Clone,
14>(
15    responses: Stream<(K, Result<V, E>), Atomic<L>, Unbounded, Order>,
16    min: usize,
17    max: usize,
18) -> (
19    Stream<(K, V), Atomic<L>, Unbounded, Order>,
20    Stream<(K, E), Atomic<L>, Unbounded, Order>,
21) {
22    let tick = responses.atomic_source();
23    let (not_all_complete_cycle, not_all) = tick.cycle::<Stream<_, _, _, Order>>();
24
25    let current_responses = not_all.chain(unsafe {
26        // SAFETY: we always persist values that have not reached quorum, so even
27        // with arbitrary batching we always produce deterministic quorum results
28        responses.clone().tick_batch()
29    });
30
31    let count_per_key = current_responses.clone().fold_keyed_commutative(
32        q!(move || (0, 0)),
33        q!(move |accum, value| {
34            if value.is_ok() {
35                accum.0 += 1;
36            } else {
37                accum.1 += 1;
38            }
39        }),
40    );
41
42    let not_reached_min_count =
43        count_per_key
44            .clone()
45            .filter_map(q!(move |(key, (success, _error))| if success < min {
46                Some(key)
47            } else {
48                None
49            }));
50
51    let reached_min_count =
52        count_per_key
53            .clone()
54            .filter_map(q!(move |(key, (success, _error))| if success >= min {
55                Some(key)
56            } else {
57                None
58            }));
59
60    let just_reached_quorum = if max == min {
61        not_all_complete_cycle
62            .complete_next_tick(current_responses.clone().anti_join(reached_min_count));
63
64        current_responses.anti_join(not_reached_min_count)
65    } else {
66        let (min_but_not_max_complete_cycle, min_but_not_max) = tick.cycle();
67
68        let received_from_all =
69            count_per_key.filter_map(q!(
70                move |(key, (success, error))| if (success + error) >= max {
71                    Some(key)
72                } else {
73                    None
74                }
75            ));
76
77        min_but_not_max_complete_cycle
78            .complete_next_tick(reached_min_count.filter_not_in(received_from_all.clone()));
79
80        not_all_complete_cycle
81            .complete_next_tick(current_responses.clone().anti_join(received_from_all));
82
83        current_responses
84            .anti_join(not_reached_min_count)
85            .anti_join(min_but_not_max)
86    };
87
88    (
89        just_reached_quorum
90            .filter_map(q!(move |(key, res)| match res {
91                Ok(v) => Some((key, v)),
92                Err(_) => None,
93            }))
94            .all_ticks_atomic(),
95        responses.filter_map(q!(move |(key, res)| match res {
96            Ok(_) => None,
97            Err(e) => Some((key, e)),
98        })),
99    )
100}
101
102#[expect(clippy::type_complexity, reason = "stream types with ordering")]
103pub fn collect_quorum<'a, L: Location<'a> + NoTick, Order, K: Clone + Eq + Hash, E: Clone>(
104    responses: Stream<(K, Result<(), E>), Atomic<L>, Unbounded, Order>,
105    min: usize,
106    max: usize,
107) -> (
108    Stream<K, Atomic<L>, Unbounded, Order>,
109    Stream<(K, E), Atomic<L>, Unbounded, Order>,
110) {
111    let tick = responses.atomic_source();
112    let (not_all_complete_cycle, not_all) = tick.cycle::<Stream<_, _, _, Order>>();
113
114    let current_responses = not_all.chain(unsafe {
115        // SAFETY: we always persist values that have not reached quorum, so even
116        // with arbitrary batching we always produce deterministic quorum results
117        responses.clone().tick_batch()
118    });
119
120    let count_per_key = current_responses.clone().fold_keyed_commutative(
121        q!(move || (0, 0)),
122        q!(move |accum, value| {
123            if value.is_ok() {
124                accum.0 += 1;
125            } else {
126                accum.1 += 1;
127            }
128        }),
129    );
130
131    let reached_min_count =
132        count_per_key
133            .clone()
134            .filter_map(q!(move |(key, (success, _error))| if success >= min {
135                Some(key)
136            } else {
137                None
138            }));
139
140    let just_reached_quorum = if max == min {
141        not_all_complete_cycle.complete_next_tick(
142            current_responses
143                .clone()
144                .anti_join(reached_min_count.clone()),
145        );
146
147        reached_min_count
148    } else {
149        let (min_but_not_max_complete_cycle, min_but_not_max) = tick.cycle();
150
151        let received_from_all =
152            count_per_key.filter_map(q!(
153                move |(key, (success, error))| if (success + error) >= max {
154                    Some(key)
155                } else {
156                    None
157                }
158            ));
159
160        min_but_not_max_complete_cycle.complete_next_tick(
161            reached_min_count
162                .clone()
163                .filter_not_in(received_from_all.clone()),
164        );
165
166        not_all_complete_cycle.complete_next_tick(current_responses.anti_join(received_from_all));
167
168        reached_min_count.filter_not_in(min_but_not_max)
169    };
170
171    (
172        just_reached_quorum.all_ticks_atomic(),
173        responses.filter_map(q!(move |(key, res)| match res {
174            Ok(_) => None,
175            Err(e) => Some((key, e)),
176        })),
177    )
178}