Skip to main content

hydro_test/cluster/
compartmentalized_paxos.rs

1use std::collections::HashMap;
2
3use hydro_lang::live_collections::stream::NoOrder;
4use hydro_lang::location::{Atomic, Location, MemberId};
5use hydro_lang::prelude::*;
6use hydro_std::quorum::collect_quorum;
7use hydro_std::request_response::join_responses;
8use serde::{Deserialize, Serialize};
9
10use super::paxos::{
11    Acceptor, Ballot, LogValue, P2a, PaxosConfig, PaxosPayload, Proposer, acceptor_p2,
12    index_payloads, leader_election, recommit_after_leader_election,
13};
14use super::paxos_with_client::PaxosLike;
15
16#[derive(Serialize, Deserialize, Clone)]
17pub struct ProxyLeader {}
18
19#[derive(Clone, Copy)]
20pub struct CompartmentalizedPaxosConfig {
21    pub paxos_config: PaxosConfig,
22    pub num_proxy_leaders: usize,
23    /// Number of rows in the acceptor grid. Each row represents a write quorum (for sending p2as).
24    pub acceptor_grid_rows: usize,
25    /// Number of columns in the acceptor grid. Each column represents a read quorum (for waiting for p1bs).
26    pub acceptor_grid_cols: usize,
27    pub num_replicas: usize,
28    /// How long to wait before resending message to a different write quorum
29    pub acceptor_retry_timeout: u64,
30}
31
32pub struct CoreCompartmentalizedPaxos<'a> {
33    pub proposers: Cluster<'a, Proposer>,
34    pub proxy_leaders: Cluster<'a, ProxyLeader>,
35    pub acceptors: Cluster<'a, Acceptor>,
36    pub config: CompartmentalizedPaxosConfig,
37}
38
39impl<'a> PaxosLike<'a> for CoreCompartmentalizedPaxos<'a> {
40    type PaxosIn = Proposer;
41    type PaxosLog = Acceptor;
42    type PaxosOut = ProxyLeader;
43    type Ballot = Ballot;
44
45    fn payload_recipients(&self) -> &Cluster<'a, Self::PaxosIn> {
46        &self.proposers
47    }
48
49    fn log_stores(&self) -> &Cluster<'a, Self::PaxosLog> {
50        &self.acceptors
51    }
52
53    fn get_recipient_from_ballot<L: Location<'a>>(
54        ballot: Optional<Self::Ballot, L, Unbounded>,
55    ) -> Optional<MemberId<Self::PaxosIn>, L, Unbounded> {
56        ballot.map(q!(|ballot| ballot.proposer_id))
57    }
58
59    fn build<P: PaxosPayload>(
60        self,
61        with_ballot: impl FnOnce(
62            Stream<Ballot, Cluster<'a, Self::PaxosIn>, Unbounded>,
63        ) -> Stream<P, Cluster<'a, Self::PaxosIn>, Unbounded>,
64        a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
65        nondet_leader: NonDet,
66        nondet_commit: NonDet,
67    ) -> Stream<(usize, Option<P>), Cluster<'a, Self::PaxosOut>, Unbounded, NoOrder> {
68        compartmentalized_paxos_core(
69            &self.proposers,
70            &self.proxy_leaders,
71            &self.acceptors,
72            a_checkpoint,
73            with_ballot,
74            self.config,
75            nondet_leader,
76            nondet_commit,
77        )
78        .1
79    }
80}
81
82/// Implements the Compartmentalized Paxos algorithm as described in "Scaling Replicated State Machines with Compartmentalization",
83/// which augments regular Paxos with a cluster of Proxy Leaders.
84///
85/// Proposers that wish to broadcast p2as to acceptors or collect p2bs from acceptors instead
86/// go through the Proxy Leaders, which offload networking. The slot is used to determine which Proxy Leader to offload to.
87/// Acceptors are arranged into a grid, where each row and column must have at least f+1 members.
88/// Rows represent "write quorums"; an entire row of acceptors must confirm a payload before it is committed.
89/// Columns represent "read quorums"; an entire column of acceptors must respond to a p1b before a proposer is elected the leader.
90/// Read and write quorums were introduced in "Flexible Paxos: Quorum Intersection Revisited".
91///
92/// Returns a stream of ballots, where new values are emitted when a new leader is elected,
93/// and a stream of sequenced payloads with an index and optional payload (in the case of
94/// holes in the log).
95///
96/// # Non-Determinism
97/// When the leader is stable, the algorithm will commit incoming payloads to the leader
98/// in deterministic order. However, when the leader is changing, payloads may be
99/// non-deterministically dropped. The stream of ballots is also non-deterministic because
100/// leaders are elected in a non-deterministic process.
101#[expect(
102    clippy::type_complexity,
103    clippy::too_many_arguments,
104    reason = "internal paxos code // TODO"
105)]
106pub fn compartmentalized_paxos_core<'a, P: PaxosPayload>(
107    proposers: &Cluster<'a, Proposer>,
108    proxy_leaders: &Cluster<'a, ProxyLeader>,
109    acceptors: &Cluster<'a, Acceptor>,
110    a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
111    c_to_proposers: impl FnOnce(
112        Stream<Ballot, Cluster<'a, Proposer>, Unbounded>,
113    ) -> Stream<P, Cluster<'a, Proposer>, Unbounded>,
114    config: CompartmentalizedPaxosConfig,
115    nondet_leader: NonDet,
116    nondet_commit_leader_change: NonDet,
117) -> (
118    Stream<Ballot, Cluster<'a, Proposer>, Unbounded>,
119    Stream<(usize, Option<P>), Cluster<'a, ProxyLeader>, Unbounded, NoOrder>,
120) {
121    proposers
122        .source_iter(q!(["Proposers say hello"]))
123        .for_each(q!(|s| println!("{}", s)));
124
125    proxy_leaders
126        .source_iter(q!(["Proxy leaders say hello"]))
127        .for_each(q!(|s| println!("{}", s)));
128
129    acceptors
130        .source_iter(q!(["Acceptors say hello"]))
131        .for_each(q!(|s| println!("{}", s)));
132
133    let proposer_tick = proposers.tick();
134    let proxy_leader_tick = proxy_leaders.tick();
135    let acceptor_tick = acceptors.tick();
136
137    let (sequencing_max_ballot_complete_cycle, sequencing_max_ballot_forward_reference) =
138        proposers.forward_ref::<Stream<Ballot, _, _, NoOrder>>();
139    let (a_log_complete_cycle, a_log_forward_reference) =
140        acceptor_tick.forward_ref::<Singleton<_, _, _>>();
141
142    let (p_ballot, p_is_leader, p_relevant_p1bs, a_max_ballot) = leader_election(
143        proposers,
144        acceptors,
145        &proposer_tick,
146        &acceptor_tick,
147        config.acceptor_grid_rows,
148        config.acceptor_grid_rows * config.acceptor_grid_cols,
149        config.paxos_config,
150        sequencing_max_ballot_forward_reference,
151        a_log_forward_reference,
152        nondet!(
153            /// The primary non-determinism exposed by leader election algorithm lies in which leader
154            /// is elected, which affects both the ballot at each proposer and the leader flag. But using a stale ballot
155            /// or leader flag will only lead to failure in sequencing rather than commiting the wrong value.
156            nondet_leader
157        ),
158        nondet!(
159            /// Because ballots are non-deterministic, the acceptor max ballot is also non-deterministic, although we are
160            /// guaranteed that the max ballot will match the current ballot of a proposer who believes they are the leader.
161        ),
162    );
163
164    let was_not_leader = p_is_leader
165        .clone()
166        .map(q!(|is_leader| is_leader.then_some(())))
167        .into_optional()
168        .defer_tick()
169        .is_none();
170    let just_became_leader = p_is_leader.clone().and(was_not_leader);
171
172    let c_to_proposers = c_to_proposers(
173        p_ballot
174            .clone()
175            .filter_if(just_became_leader.clone())
176            .all_ticks(),
177    );
178
179    let (p_to_replicas, a_log, sequencing_max_ballots) = sequence_payload(
180        proposers,
181        proxy_leaders,
182        acceptors,
183        &proposer_tick,
184        &proxy_leader_tick,
185        &acceptor_tick,
186        c_to_proposers,
187        a_checkpoint,
188        p_ballot.clone(),
189        p_is_leader,
190        p_relevant_p1bs,
191        config,
192        a_max_ballot,
193        nondet!(
194            /// The relevant p1bs are non-deterministic because they come from a arbitrary quorum, but because
195            /// we use a quorum, if we remain the leader there are no missing committed values when we combine the logs.
196            /// The remaining non-determinism is in when incoming payloads are batched versus the leader flag and state
197            /// of acceptors, which in the worst case will lead to dropped payloads as documented.
198            nondet_commit_leader_change
199        ),
200    );
201
202    a_log_complete_cycle.complete(a_log.snapshot_atomic(
203        &acceptor_tick,
204        nondet!(
205            /// We will always write payloads to the log before acknowledging them to the proposers,
206            /// which guarantees that if the leader changes the quorum overlap between sequencing and leader
207            /// election will include the committed value.
208        ),
209    ));
210    sequencing_max_ballot_complete_cycle.complete(sequencing_max_ballots);
211
212    (
213        // Only tell the clients once when leader election concludes
214        p_ballot.filter_if(just_became_leader).all_ticks(),
215        p_to_replicas,
216    )
217}
218
219#[expect(
220    clippy::type_complexity,
221    clippy::too_many_arguments,
222    reason = "internal paxos code // TODO"
223)]
224fn sequence_payload<'a, P: PaxosPayload>(
225    proposers: &Cluster<'a, Proposer>,
226    proxy_leaders: &Cluster<'a, ProxyLeader>,
227    acceptors: &Cluster<'a, Acceptor>,
228    proposer_tick: &Tick<Cluster<'a, Proposer>>,
229    proxy_leader_tick: &Tick<Cluster<'a, ProxyLeader>>,
230    acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
231    c_to_proposers: Stream<P, Cluster<'a, Proposer>, Unbounded>,
232    a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
233
234    p_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
235    p_is_leader: Singleton<bool, Tick<Cluster<'a, Proposer>>, Bounded>,
236
237    p_relevant_p1bs: Stream<
238        (Option<usize>, HashMap<usize, LogValue<P>>),
239        Tick<Cluster<'a, Proposer>>,
240        Bounded,
241        NoOrder,
242    >,
243    config: CompartmentalizedPaxosConfig,
244    a_max_ballot: Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
245    nondet_commit_leader_change: NonDet,
246) -> (
247    Stream<(usize, Option<P>), Cluster<'a, ProxyLeader>, Unbounded, NoOrder>,
248    Singleton<
249        (Option<usize>, HashMap<usize, LogValue<P>>),
250        Atomic<Cluster<'a, Acceptor>>,
251        Unbounded,
252    >,
253    Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder>,
254) {
255    let (p_log_to_recommit, p_max_slot) =
256        recommit_after_leader_election(p_relevant_p1bs, p_ballot.clone(), config.paxos_config.f);
257
258    let p_indexed_payloads = index_payloads(
259        p_max_slot,
260        c_to_proposers
261            .batch(
262                proposer_tick,
263                nondet!(
264                    /// We batch payloads so that we can compute the correct slot based on
265                    /// base slot. In the case of a leader re-election, the base slot is updated which
266                    /// affects the computed payload slots. This non-determinism can lead to non-determinism
267                    /// in which payloads are committed when the leader is changing, which is documented at
268                    /// the function level.
269                    nondet_commit_leader_change
270                ),
271            )
272            .filter_if(p_is_leader.clone()),
273    );
274
275    let num_proxy_leaders = config.num_proxy_leaders;
276    let p_to_proxy_leaders_p2a = p_indexed_payloads
277        .cross_singleton(p_ballot.clone())
278        .map(q!(move |((slot, payload), ballot)| (
279            MemberId::<ProxyLeader>::from_raw_id((slot % num_proxy_leaders) as u32),
280            ((slot, ballot), Some(payload))
281        )))
282        .chain(p_log_to_recommit.map(q!(move |((slot, ballot), payload)| (
283            MemberId::<ProxyLeader>::from_raw_id((slot % num_proxy_leaders) as u32),
284            ((slot, ballot), payload)
285        ))))
286        .all_ticks()
287        .demux(proxy_leaders, TCP.fail_stop().bincode())
288        .values()
289        .atomic();
290
291    // Send to a specific acceptor row
292    let num_acceptor_rows = config.acceptor_grid_rows;
293    let num_acceptor_cols = config.acceptor_grid_cols;
294    let pl_to_acceptors_p2a_thrifty = p_to_proxy_leaders_p2a
295        .clone()
296        .flat_map_unordered(q!(move |((slot, ballot), payload)| {
297            let row = slot % num_acceptor_rows;
298            (0..num_acceptor_cols).map(move |i| {
299                (
300                    MemberId::<Acceptor>::from_raw_id((row * num_acceptor_cols + i) as u32),
301                    P2a {
302                        sender: MemberId::<ProxyLeader>::from_raw_id(
303                            (slot % num_proxy_leaders) as u32,
304                        ),
305                        slot,
306                        ballot: ballot.clone(),
307                        value: payload.clone(),
308                    },
309                )
310            })
311        }))
312        .end_atomic()
313        .demux(acceptors, TCP.fail_stop().bincode())
314        .values();
315
316    let (a_log, a_to_proxy_leaders_p2b) = acceptor_p2(
317        acceptor_tick,
318        a_max_ballot.clone(),
319        pl_to_acceptors_p2a_thrifty,
320        a_checkpoint,
321        proxy_leaders,
322    );
323
324    // TODO: This is a liveness problem if any node in the thrifty quorum fails
325    // Need special operator for per-value timeout detection
326    let (quorums, fails) = collect_quorum(
327        a_to_proxy_leaders_p2b,
328        config.acceptor_grid_cols,
329        config.acceptor_grid_cols,
330    );
331
332    let pl_to_replicas = join_responses(
333        quorums.map(q!(|k| (k, ()))),
334        p_to_proxy_leaders_p2a.batch_atomic(
335            proxy_leader_tick,
336            nondet!(
337                /// The metadata will always be generated before we get a quorum
338                /// because our batch of `p_to_proxy_leaders_p2a` is at least after
339                /// what we sent to the acceptors.
340            ),
341        ),
342    );
343
344    let pl_failed_p2b_to_proposer = fails
345        .map(q!(|(_, ballot)| (ballot.proposer_id.clone(), ballot)))
346        .inspect(q!(|(_, ballot)| println!("Failed P2b: {:?}", ballot)))
347        .demux(proposers, TCP.fail_stop().bincode())
348        .values();
349
350    (
351        pl_to_replicas.map(q!(|((slot, _ballot), (value, _))| (slot, value))),
352        a_log,
353        pl_failed_p2b_to_proposer,
354    )
355}