hydro_std/
quorum.rs

1use std::hash::Hash;
2
3use hydro_lang::*;
4use location::NoTick;
5
6#[expect(clippy::type_complexity, reason = "stream types with ordering")]
7pub fn collect_quorum_with_response<
8    'a,
9    L: Location<'a> + NoTick,
10    Order,
11    K: Clone + Eq + Hash,
12    V: Clone,
13    E: Clone,
14>(
15    responses: Stream<(K, Result<V, E>), Atomic<L>, Unbounded, Order>,
16    min: usize,
17    max: usize,
18) -> (
19    Stream<(K, V), Atomic<L>, Unbounded, Order>,
20    Stream<(K, E), Atomic<L>, Unbounded, Order>,
21) {
22    let tick = responses.atomic_source();
23    let (not_all_complete_cycle, not_all) = tick.cycle::<Stream<_, _, _, Order>>();
24
25    let current_responses = not_all.chain(responses.clone().batch(nondet!(
26        /// We always persist values that have not reached quorum, so even
27        /// with arbitrary batching we always produce deterministic quorum results.
28    )));
29
30    let count_per_key = current_responses.clone().into_keyed().fold_commutative(
31        q!(move || (0, 0)),
32        q!(move |accum, value| {
33            if value.is_ok() {
34                accum.0 += 1;
35            } else {
36                accum.1 += 1;
37            }
38        }),
39    );
40
41    let not_reached_min_count = count_per_key
42        .clone()
43        .filter(q!(move |(success, _error)| success < &min))
44        .keys();
45
46    let reached_min_count = count_per_key
47        .clone()
48        .filter(q!(move |(success, _error)| success >= &min))
49        .keys();
50
51    let just_reached_quorum = if max == min {
52        not_all_complete_cycle
53            .complete_next_tick(current_responses.clone().anti_join(reached_min_count));
54
55        current_responses.anti_join(not_reached_min_count)
56    } else {
57        let (min_but_not_max_complete_cycle, min_but_not_max) = tick.cycle();
58
59        let received_from_all = count_per_key
60            .filter(q!(move |(success, error)| (success + error) >= max))
61            .keys();
62
63        min_but_not_max_complete_cycle
64            .complete_next_tick(reached_min_count.filter_not_in(received_from_all.clone()));
65
66        not_all_complete_cycle
67            .complete_next_tick(current_responses.clone().anti_join(received_from_all));
68
69        current_responses
70            .anti_join(not_reached_min_count)
71            .anti_join(min_but_not_max)
72    };
73
74    (
75        just_reached_quorum
76            .filter_map(q!(move |(key, res)| match res {
77                Ok(v) => Some((key, v)),
78                Err(_) => None,
79            }))
80            .all_ticks_atomic(),
81        responses.filter_map(q!(move |(key, res)| match res {
82            Ok(_) => None,
83            Err(e) => Some((key, e)),
84        })),
85    )
86}
87
88#[expect(clippy::type_complexity, reason = "stream types with ordering")]
89pub fn collect_quorum<'a, L: Location<'a> + NoTick, Order, K: Clone + Eq + Hash, E: Clone>(
90    responses: Stream<(K, Result<(), E>), Atomic<L>, Unbounded, Order>,
91    min: usize,
92    max: usize,
93) -> (
94    Stream<K, Atomic<L>, Unbounded, NoOrder>,
95    Stream<(K, E), Atomic<L>, Unbounded, Order>,
96) {
97    let tick = responses.atomic_source();
98    let (not_all_complete_cycle, not_all) = tick.cycle::<Stream<_, _, _, Order>>();
99
100    let current_responses = not_all.chain(responses.clone().batch(nondet!(
101        /// We always persist values that have not reached quorum, so even
102        /// with arbitrary batching we always produce deterministic quorum results.
103    )));
104
105    let count_per_key = current_responses.clone().into_keyed().fold_commutative(
106        q!(move || (0, 0)),
107        q!(move |accum, value| {
108            if value.is_ok() {
109                accum.0 += 1;
110            } else {
111                accum.1 += 1;
112            }
113        }),
114    );
115
116    let reached_min_count =
117        count_per_key
118            .clone()
119            .entries()
120            .filter_map(q!(move |(key, (success, _error))| if success >= min {
121                Some(key)
122            } else {
123                None
124            }));
125
126    let just_reached_quorum = if max == min {
127        not_all_complete_cycle.complete_next_tick(
128            current_responses
129                .clone()
130                .anti_join(reached_min_count.clone()),
131        );
132
133        reached_min_count
134    } else {
135        let (min_but_not_max_complete_cycle, min_but_not_max) = tick.cycle();
136
137        let received_from_all = count_per_key
138            .filter(q!(move |(success, error)| (success + error) >= max))
139            .keys();
140
141        min_but_not_max_complete_cycle.complete_next_tick(
142            reached_min_count
143                .clone()
144                .filter_not_in(received_from_all.clone()),
145        );
146
147        not_all_complete_cycle.complete_next_tick(current_responses.anti_join(received_from_all));
148
149        reached_min_count.filter_not_in(min_but_not_max)
150    };
151
152    (
153        just_reached_quorum.all_ticks_atomic(),
154        responses.filter_map(q!(move |(key, res)| match res {
155            Ok(_) => None,
156            Err(e) => Some((key, e)),
157        })),
158    )
159}