hydro_test/cluster/
compartmentalized_paxos.rs

1use std::collections::HashMap;
2
3use hydro_lang::*;
4use hydro_std::quorum::collect_quorum;
5use hydro_std::request_response::join_responses;
6use serde::{Deserialize, Serialize};
7
8use super::paxos::{
9    Acceptor, Ballot, LogValue, P2a, PaxosConfig, PaxosPayload, Proposer, acceptor_p2,
10    index_payloads, leader_election, recommit_after_leader_election,
11};
12use super::paxos_with_client::PaxosLike;
13
14#[derive(Serialize, Deserialize, Clone)]
15pub struct ProxyLeader {}
16
17#[derive(Clone, Copy)]
18pub struct CompartmentalizedPaxosConfig {
19    pub paxos_config: PaxosConfig,
20    pub num_proxy_leaders: usize,
21    /// Number of rows in the acceptor grid. Each row represents a write quorum (for sending p2as).
22    pub acceptor_grid_rows: usize,
23    /// Number of columns in the acceptor grid. Each column represents a read quorum (for waiting for p1bs).
24    pub acceptor_grid_cols: usize,
25    pub num_replicas: usize,
26    /// How long to wait before resending message to a different write quorum
27    pub acceptor_retry_timeout: u64,
28}
29
30pub struct CoreCompartmentalizedPaxos<'a> {
31    pub proposers: Cluster<'a, Proposer>,
32    pub proxy_leaders: Cluster<'a, ProxyLeader>,
33    pub acceptors: Cluster<'a, Acceptor>,
34    pub config: CompartmentalizedPaxosConfig,
35}
36
37impl<'a> PaxosLike<'a> for CoreCompartmentalizedPaxos<'a> {
38    type PaxosIn = Proposer;
39    type PaxosLog = Acceptor;
40    type PaxosOut = ProxyLeader;
41    type Ballot = Ballot;
42
43    fn payload_recipients(&self) -> &Cluster<'a, Self::PaxosIn> {
44        &self.proposers
45    }
46
47    fn log_stores(&self) -> &Cluster<'a, Self::PaxosLog> {
48        &self.acceptors
49    }
50
51    fn get_recipient_from_ballot<L: Location<'a>>(
52        ballot: Optional<Self::Ballot, L, Unbounded>,
53    ) -> Optional<MemberId<Self::PaxosIn>, L, Unbounded> {
54        ballot.map(q!(|ballot| ballot.proposer_id))
55    }
56
57    fn build<P: PaxosPayload>(
58        self,
59        with_ballot: impl FnOnce(
60            Stream<Ballot, Cluster<'a, Self::PaxosIn>, Unbounded>,
61        ) -> Stream<P, Cluster<'a, Self::PaxosIn>, Unbounded>,
62        a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
63        nondet_leader: NonDet,
64        nondet_commit: NonDet,
65    ) -> Stream<(usize, Option<P>), Cluster<'a, Self::PaxosOut>, Unbounded, NoOrder> {
66        compartmentalized_paxos_core(
67            &self.proposers,
68            &self.proxy_leaders,
69            &self.acceptors,
70            a_checkpoint,
71            with_ballot,
72            self.config,
73            nondet_leader,
74            nondet_commit,
75        )
76        .1
77    }
78}
79
80/// Implements the Compartmentalized Paxos algorithm as described in "Scaling Replicated State Machines with Compartmentalization",
81/// which augments regular Paxos with a cluster of Proxy Leaders.
82///
83/// Proposers that wish to broadcast p2as to acceptors or collect p2bs from acceptors instead
84/// go through the Proxy Leaders, which offload networking. The slot is used to determine which Proxy Leader to offload to.
85/// Acceptors are arranged into a grid, where each row and column must have at least f+1 members.
86/// Rows represent "write quorums"; an entire row of acceptors must confirm a payload before it is committed.
87/// Columns represent "read quorums"; an entire column of acceptors must respond to a p1b before a proposer is elected the leader.
88/// Read and write quorums were introduced in "Flexible Paxos: Quorum Intersection Revisited".
89///
90/// Returns a stream of ballots, where new values are emitted when a new leader is elected,
91/// and a stream of sequenced payloads with an index and optional payload (in the case of
92/// holes in the log).
93///
94/// # Non-Determinism
95/// When the leader is stable, the algorithm will commit incoming payloads to the leader
96/// in deterministic order. However, when the leader is changing, payloads may be
97/// non-deterministically dropped. The stream of ballots is also non-deterministic because
98/// leaders are elected in a non-deterministic process.
99#[expect(
100    clippy::type_complexity,
101    clippy::too_many_arguments,
102    reason = "internal paxos code // TODO"
103)]
104pub fn compartmentalized_paxos_core<'a, P: PaxosPayload>(
105    proposers: &Cluster<'a, Proposer>,
106    proxy_leaders: &Cluster<'a, ProxyLeader>,
107    acceptors: &Cluster<'a, Acceptor>,
108    a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
109    c_to_proposers: impl FnOnce(
110        Stream<Ballot, Cluster<'a, Proposer>, Unbounded>,
111    ) -> Stream<P, Cluster<'a, Proposer>, Unbounded>,
112    config: CompartmentalizedPaxosConfig,
113    nondet_leader: NonDet,
114    nondet_commit_leader_change: NonDet,
115) -> (
116    Stream<Ballot, Cluster<'a, Proposer>, Unbounded>,
117    Stream<(usize, Option<P>), Cluster<'a, ProxyLeader>, Unbounded, NoOrder>,
118) {
119    proposers
120        .source_iter(q!(["Proposers say hello"]))
121        .for_each(q!(|s| println!("{}", s)));
122
123    proxy_leaders
124        .source_iter(q!(["Proxy leaders say hello"]))
125        .for_each(q!(|s| println!("{}", s)));
126
127    acceptors
128        .source_iter(q!(["Acceptors say hello"]))
129        .for_each(q!(|s| println!("{}", s)));
130
131    let proposer_tick = proposers.tick();
132    let proxy_leader_tick = proxy_leaders.tick();
133    let acceptor_tick = acceptors.tick();
134
135    let (sequencing_max_ballot_complete_cycle, sequencing_max_ballot_forward_reference) =
136        proposers.forward_ref::<Stream<Ballot, _, _, NoOrder>>();
137    let (a_log_complete_cycle, a_log_forward_reference) =
138        acceptor_tick.forward_ref::<Singleton<_, _, _>>();
139
140    let (p_ballot, p_is_leader, p_relevant_p1bs, a_max_ballot) = leader_election(
141        proposers,
142        acceptors,
143        &proposer_tick,
144        &acceptor_tick,
145        config.acceptor_grid_rows,
146        config.acceptor_grid_rows * config.acceptor_grid_cols,
147        config.paxos_config,
148        sequencing_max_ballot_forward_reference,
149        a_log_forward_reference,
150        nondet!(
151            /// The primary non-determinism exposed by leader election algorithm lies in which leader
152            /// is elected, which affects both the ballot at each proposer and the leader flag. But using a stale ballot
153            /// or leader flag will only lead to failure in sequencing rather than commiting the wrong value.
154            nondet_leader
155        ),
156        nondet!(
157            /// Because ballots are non-deterministic, the acceptor max ballot is also non-deterministic, although we are
158            /// guaranteed that the max ballot will match the current ballot of a proposer who believes they are the leader.
159        ),
160    );
161
162    let just_became_leader = p_is_leader
163        .clone()
164        .continue_unless(p_is_leader.clone().defer_tick());
165
166    let c_to_proposers = c_to_proposers(
167        just_became_leader
168            .clone()
169            .then(p_ballot.clone())
170            .all_ticks(),
171    );
172
173    let (p_to_replicas, a_log, sequencing_max_ballots) = sequence_payload(
174        proposers,
175        proxy_leaders,
176        acceptors,
177        &proposer_tick,
178        &proxy_leader_tick,
179        &acceptor_tick,
180        c_to_proposers,
181        a_checkpoint,
182        p_ballot.clone(),
183        p_is_leader,
184        p_relevant_p1bs,
185        config,
186        a_max_ballot,
187        nondet!(
188            /// The relevant p1bs are non-deterministic because they come from a arbitrary quorum, but because
189            /// we use a quorum, if we remain the leader there are no missing committed values when we combine the logs.
190            /// The remaining non-determinism is in when incoming payloads are batched versus the leader flag and state
191            /// of acceptors, which in the worst case will lead to dropped payloads as documented.
192            nondet_commit_leader_change
193        ),
194    );
195
196    a_log_complete_cycle.complete(a_log.snapshot(nondet!(
197        /// We will always write payloads to the log before acknowledging them to the proposers,
198        /// which guarantees that if the leader changes the quorum overlap between sequencing and leader
199        /// election will include the committed value.
200    )));
201    sequencing_max_ballot_complete_cycle.complete(sequencing_max_ballots);
202
203    (
204        // Only tell the clients once when leader election concludes
205        just_became_leader.then(p_ballot).all_ticks(),
206        p_to_replicas,
207    )
208}
209
210#[expect(
211    clippy::type_complexity,
212    clippy::too_many_arguments,
213    reason = "internal paxos code // TODO"
214)]
215fn sequence_payload<'a, P: PaxosPayload>(
216    proposers: &Cluster<'a, Proposer>,
217    proxy_leaders: &Cluster<'a, ProxyLeader>,
218    acceptors: &Cluster<'a, Acceptor>,
219    proposer_tick: &Tick<Cluster<'a, Proposer>>,
220    proxy_leader_tick: &Tick<Cluster<'a, ProxyLeader>>,
221    acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
222    c_to_proposers: Stream<P, Cluster<'a, Proposer>, Unbounded>,
223    a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
224
225    p_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
226    p_is_leader: Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
227
228    p_relevant_p1bs: Stream<
229        (Option<usize>, HashMap<usize, LogValue<P>>),
230        Tick<Cluster<'a, Proposer>>,
231        Bounded,
232        NoOrder,
233    >,
234    config: CompartmentalizedPaxosConfig,
235    a_max_ballot: Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
236    nondet_commit_leader_change: NonDet,
237) -> (
238    Stream<(usize, Option<P>), Cluster<'a, ProxyLeader>, Unbounded, NoOrder>,
239    Singleton<
240        (Option<usize>, HashMap<usize, LogValue<P>>),
241        Atomic<Cluster<'a, Acceptor>>,
242        Unbounded,
243    >,
244    Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder>,
245) {
246    let (p_log_to_recommit, p_max_slot) =
247        recommit_after_leader_election(p_relevant_p1bs, p_ballot.clone(), config.paxos_config.f);
248
249    let p_indexed_payloads = index_payloads(
250        proposer_tick,
251        p_max_slot,
252        c_to_proposers
253            .batch(
254                proposer_tick,
255                nondet!(
256                    /// We batch payloads so that we can compute the correct slot based on
257                    /// base slot. In the case of a leader re-election, the base slot is updated which
258                    /// affects the computed payload slots. This non-determinism can lead to non-determinism
259                    /// in which payloads are committed when the leader is changing, which is documented at
260                    /// the function level.
261                    nondet_commit_leader_change
262                ),
263            )
264            .continue_if(p_is_leader.clone()),
265    );
266
267    let num_proxy_leaders = config.num_proxy_leaders;
268    let p_to_proxy_leaders_p2a = p_indexed_payloads
269        .cross_singleton(p_ballot.clone())
270        .map(q!(move |((slot, payload), ballot)| (
271            MemberId::<ProxyLeader>::from_raw((slot % num_proxy_leaders) as u32),
272            ((slot, ballot), Some(payload))
273        )))
274        .chain(p_log_to_recommit.map(q!(move |((slot, ballot), payload)| (
275            MemberId::<ProxyLeader>::from_raw((slot % num_proxy_leaders) as u32),
276            ((slot, ballot), payload)
277        ))))
278        .all_ticks()
279        .demux_bincode(proxy_leaders)
280        .values();
281
282    // Send to a specific acceptor row
283    let num_acceptor_rows = config.acceptor_grid_rows;
284    let num_acceptor_cols = config.acceptor_grid_cols;
285    let pl_to_acceptors_p2a_thrifty = p_to_proxy_leaders_p2a
286        .clone()
287        .flat_map_unordered(q!(move |((slot, ballot), payload)| {
288            let row = slot % num_acceptor_rows;
289            let mut p2as = Vec::new();
290            for i in 0..num_acceptor_cols {
291                p2as.push((
292                    MemberId::<Acceptor>::from_raw((row * num_acceptor_cols + i) as u32),
293                    P2a {
294                        sender: MemberId::<ProxyLeader>::from_raw(
295                            (slot % num_proxy_leaders) as u32,
296                        ),
297                        slot,
298                        ballot,
299                        value: payload.clone(),
300                    },
301                ));
302            }
303            p2as
304        }))
305        .demux_bincode(acceptors)
306        .values();
307
308    let (a_log, a_to_proxy_leaders_p2b) = acceptor_p2(
309        acceptor_tick,
310        a_max_ballot.clone(),
311        pl_to_acceptors_p2a_thrifty,
312        a_checkpoint,
313        proxy_leaders,
314    );
315
316    // TODO: This is a liveness problem if any node in the thrifty quorum fails
317    // Need special operator for per-value timeout detection
318    let (quorums, fails) = collect_quorum(
319        a_to_proxy_leaders_p2b.atomic(proxy_leader_tick),
320        config.acceptor_grid_cols,
321        config.acceptor_grid_cols,
322    );
323
324    let pl_to_replicas = join_responses(
325        proxy_leader_tick,
326        quorums.map(q!(|k| (k, ()))),
327        p_to_proxy_leaders_p2a.batch(proxy_leader_tick, nondet!(/** TODO */)),
328    );
329
330    let pl_failed_p2b_to_proposer = fails
331        .map(q!(|(_, ballot)| (ballot.proposer_id, ballot)))
332        .inspect(q!(|(_, ballot)| println!("Failed P2b: {:?}", ballot)))
333        .end_atomic()
334        .demux_bincode(proposers)
335        .values();
336
337    (
338        pl_to_replicas
339            .map(q!(|((slot, _ballot), (value, _))| (slot, value)))
340            .end_atomic(),
341        a_log,
342        pl_failed_p2b_to_proposer,
343    )
344}