hydro_test/cluster/
compartmentalized_paxos.rs

1use std::collections::HashMap;
2
3use hydro_lang::*;
4use hydro_std::quorum::collect_quorum;
5use hydro_std::request_response::join_responses;
6use serde::{Deserialize, Serialize};
7
8use super::paxos::{
9    Acceptor, Ballot, LogValue, P2a, PaxosConfig, PaxosPayload, Proposer, acceptor_p2,
10    index_payloads, leader_election, recommit_after_leader_election,
11};
12use super::paxos_with_client::PaxosLike;
13
14#[derive(Serialize, Deserialize, Clone)]
15pub struct ProxyLeader {}
16
17#[derive(Clone, Copy)]
18pub struct CompartmentalizedPaxosConfig {
19    pub paxos_config: PaxosConfig,
20    pub num_proxy_leaders: usize,
21    /// Number of rows in the acceptor grid. Each row represents a write quorum (for sending p2as).
22    pub acceptor_grid_rows: usize,
23    /// Number of columns in the acceptor grid. Each column represents a read quorum (for waiting for p1bs).
24    pub acceptor_grid_cols: usize,
25    pub num_replicas: usize,
26    /// How long to wait before resending message to a different write quorum
27    pub acceptor_retry_timeout: u64,
28}
29
30pub struct CoreCompartmentalizedPaxos<'a> {
31    pub proposers: Cluster<'a, Proposer>,
32    pub proxy_leaders: Cluster<'a, ProxyLeader>,
33    pub acceptors: Cluster<'a, Acceptor>,
34    pub config: CompartmentalizedPaxosConfig,
35}
36
37impl<'a> PaxosLike<'a> for CoreCompartmentalizedPaxos<'a> {
38    type PaxosIn = Proposer;
39    type PaxosLog = Acceptor;
40    type PaxosOut = ProxyLeader;
41    type Ballot = Ballot;
42
43    fn payload_recipients(&self) -> &Cluster<'a, Self::PaxosIn> {
44        &self.proposers
45    }
46
47    fn log_stores(&self) -> &Cluster<'a, Self::PaxosLog> {
48        &self.acceptors
49    }
50
51    fn get_recipient_from_ballot<L: Location<'a>>(
52        ballot: Optional<Self::Ballot, L, Unbounded>,
53    ) -> Optional<ClusterId<Self::PaxosIn>, L, Unbounded> {
54        ballot.map(q!(|ballot| ballot.proposer_id))
55    }
56
57    unsafe fn build<P: PaxosPayload>(
58        self,
59        with_ballot: impl FnOnce(
60            Stream<Ballot, Cluster<'a, Self::PaxosIn>, Unbounded>,
61        ) -> Stream<P, Cluster<'a, Self::PaxosIn>, Unbounded>,
62        a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
63    ) -> Stream<(usize, Option<P>), Cluster<'a, Self::PaxosOut>, Unbounded, NoOrder> {
64        unsafe {
65            compartmentalized_paxos_core(
66                &self.proposers,
67                &self.proxy_leaders,
68                &self.acceptors,
69                a_checkpoint,
70                with_ballot,
71                self.config,
72            )
73            .1
74        }
75    }
76}
77
78/// Implements the Compartmentalized Paxos algorithm as described in "Scaling Replicated State Machines with Compartmentalization",
79/// which augments regular Paxos with a cluster of Proxy Leaders.
80///
81/// Proposers that wish to broadcast p2as to acceptors or collect p2bs from acceptors instead
82/// go through the Proxy Leaders, which offload networking. The slot is used to determine which Proxy Leader to offload to.
83/// Acceptors are arranged into a grid, where each row and column must have at least f+1 members.
84/// Rows represent "write quorums"; an entire row of acceptors must confirm a payload before it is committed.
85/// Columns represent "read quorums"; an entire column of acceptors must respond to a p1b before a proposer is elected the leader.
86/// Read and write quorums were introduced in "Flexible Paxos: Quorum Intersection Revisited".
87///
88/// Returns a stream of ballots, where new values are emitted when a new leader is elected,
89/// and a stream of sequenced payloads with an index and optional payload (in the case of
90/// holes in the log).
91///
92/// # Safety
93/// When the leader is stable, the algorithm will commit incoming payloads to the leader
94/// in deterministic order. However, when the leader is changing, payloads may be
95/// non-deterministically dropped. The stream of ballots is also non-deterministic because
96/// leaders are elected in a non-deterministic process.
97#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
98pub unsafe fn compartmentalized_paxos_core<'a, P: PaxosPayload>(
99    proposers: &Cluster<'a, Proposer>,
100    proxy_leaders: &Cluster<'a, ProxyLeader>,
101    acceptors: &Cluster<'a, Acceptor>,
102    a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
103    c_to_proposers: impl FnOnce(
104        Stream<Ballot, Cluster<'a, Proposer>, Unbounded>,
105    ) -> Stream<P, Cluster<'a, Proposer>, Unbounded>,
106    config: CompartmentalizedPaxosConfig,
107) -> (
108    Stream<Ballot, Cluster<'a, Proposer>, Unbounded>,
109    Stream<(usize, Option<P>), Cluster<'a, ProxyLeader>, Unbounded, NoOrder>,
110) {
111    proposers
112        .source_iter(q!(["Proposers say hello"]))
113        .for_each(q!(|s| println!("{}", s)));
114
115    proxy_leaders
116        .source_iter(q!(["Proxy leaders say hello"]))
117        .for_each(q!(|s| println!("{}", s)));
118
119    acceptors
120        .source_iter(q!(["Acceptors say hello"]))
121        .for_each(q!(|s| println!("{}", s)));
122
123    let proposer_tick = proposers.tick();
124    let proxy_leader_tick = proxy_leaders.tick();
125    let acceptor_tick = acceptors.tick();
126
127    let (sequencing_max_ballot_complete_cycle, sequencing_max_ballot_forward_reference) =
128        proposers.forward_ref::<Stream<Ballot, _, _, NoOrder>>();
129    let (a_log_complete_cycle, a_log_forward_reference) =
130        acceptor_tick.forward_ref::<Singleton<_, _, _>>();
131
132    let (p_ballot, p_is_leader, p_relevant_p1bs, a_max_ballot) = unsafe {
133        // SAFETY: The primary non-determinism exposed by leader election algorithm lies in which leader
134        // is elected, which affects both the ballot at each proposer and the leader flag. But using a stale ballot
135        // or leader flag will only lead to failure in sequencing rather than commiting the wrong value. Because
136        // ballots are non-deterministic, the acceptor max ballot is also non-deterministic, although we are
137        // guaranteed that the max ballot will match the current ballot of a proposer who believes they are the leader.
138        leader_election(
139            proposers,
140            acceptors,
141            &proposer_tick,
142            &acceptor_tick,
143            config.acceptor_grid_rows,
144            config.acceptor_grid_rows * config.acceptor_grid_cols,
145            config.paxos_config,
146            sequencing_max_ballot_forward_reference,
147            a_log_forward_reference,
148        )
149    };
150
151    let just_became_leader = p_is_leader
152        .clone()
153        .continue_unless(p_is_leader.clone().defer_tick());
154
155    let c_to_proposers = c_to_proposers(
156        just_became_leader
157            .clone()
158            .then(p_ballot.clone())
159            .all_ticks(),
160    );
161
162    let (p_to_replicas, a_log, sequencing_max_ballots) = unsafe {
163        // SAFETY: The relevant p1bs are non-deterministic because they come from a arbitrary quorum, but because
164        // we use a quorum, if we remain the leader there are no missing committed values when we combine the logs.
165        // The remaining non-determinism is in when incoming payloads are batched versus the leader flag and state
166        // of acceptors, which in the worst case will lead to dropped payloads as documented.
167        sequence_payload(
168            proposers,
169            proxy_leaders,
170            acceptors,
171            &proposer_tick,
172            &proxy_leader_tick,
173            &acceptor_tick,
174            c_to_proposers,
175            a_checkpoint,
176            p_ballot.clone(),
177            p_is_leader,
178            p_relevant_p1bs,
179            config,
180            a_max_ballot,
181        )
182    };
183
184    a_log_complete_cycle.complete(unsafe {
185        // SAFETY: We will always write payloads to the log before acknowledging them to the proposers,
186        // which guarantees that if the leader changes the quorum overlap between sequencing and leader
187        // election will include the committed value.
188        a_log.latest_tick()
189    });
190    sequencing_max_ballot_complete_cycle.complete(sequencing_max_ballots);
191
192    (
193        // Only tell the clients once when leader election concludes
194        just_became_leader.then(p_ballot).all_ticks(),
195        p_to_replicas,
196    )
197}
198
199#[expect(
200    clippy::type_complexity,
201    clippy::too_many_arguments,
202    reason = "internal paxos code // TODO"
203)]
204unsafe fn sequence_payload<'a, P: PaxosPayload>(
205    proposers: &Cluster<'a, Proposer>,
206    proxy_leaders: &Cluster<'a, ProxyLeader>,
207    acceptors: &Cluster<'a, Acceptor>,
208    proposer_tick: &Tick<Cluster<'a, Proposer>>,
209    proxy_leader_tick: &Tick<Cluster<'a, ProxyLeader>>,
210    acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
211    c_to_proposers: Stream<P, Cluster<'a, Proposer>, Unbounded>,
212    a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
213
214    p_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
215    p_is_leader: Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
216
217    p_relevant_p1bs: Stream<
218        (Option<usize>, HashMap<usize, LogValue<P>>),
219        Tick<Cluster<'a, Proposer>>,
220        Bounded,
221        NoOrder,
222    >,
223    config: CompartmentalizedPaxosConfig,
224    a_max_ballot: Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
225) -> (
226    Stream<(usize, Option<P>), Cluster<'a, ProxyLeader>, Unbounded, NoOrder>,
227    Singleton<
228        (Option<usize>, HashMap<usize, LogValue<P>>),
229        Atomic<Cluster<'a, Acceptor>>,
230        Unbounded,
231    >,
232    Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder>,
233) {
234    let (p_log_to_recommit, p_max_slot) =
235        recommit_after_leader_election(p_relevant_p1bs, p_ballot.clone(), config.paxos_config.f);
236
237    let p_indexed_payloads = index_payloads(proposer_tick, p_max_slot, unsafe {
238        // SAFETY: We batch payloads so that we can compute the correct slot based on
239        // base slot. In the case of a leader re-election, the base slot is updated which
240        // affects the computed payload slots. This non-determinism can lead to non-determinism
241        // in which payloads are committed when the leader is changing, which is documented at
242        // the function level.
243        c_to_proposers
244            .tick_batch(proposer_tick)
245            .continue_if(p_is_leader.clone())
246    });
247
248    let num_proxy_leaders = config.num_proxy_leaders;
249    let p_to_proxy_leaders_p2a = p_indexed_payloads
250        .cross_singleton(p_ballot.clone())
251        .map(q!(move |((slot, payload), ballot)| (
252            ClusterId::<ProxyLeader>::from_raw((slot % num_proxy_leaders) as u32),
253            ((slot, ballot), Some(payload))
254        )))
255        .chain(p_log_to_recommit.map(q!(move |((slot, ballot), payload)| (
256            ClusterId::<ProxyLeader>::from_raw((slot % num_proxy_leaders) as u32),
257            ((slot, ballot), payload)
258        ))))
259        .all_ticks()
260        .send_bincode_anonymous(proxy_leaders);
261
262    // Send to a specific acceptor row
263    let num_acceptor_rows = config.acceptor_grid_rows;
264    let num_acceptor_cols = config.acceptor_grid_cols;
265    let pl_to_acceptors_p2a_thrifty = p_to_proxy_leaders_p2a
266        .clone()
267        .flat_map_unordered(q!(move |((slot, ballot), payload)| {
268            let row = slot % num_acceptor_rows;
269            let mut p2as = Vec::new();
270            for i in 0..num_acceptor_cols {
271                p2as.push((
272                    ClusterId::<Acceptor>::from_raw((row * num_acceptor_cols + i) as u32),
273                    P2a {
274                        sender: ClusterId::<ProxyLeader>::from_raw(
275                            (slot % num_proxy_leaders) as u32,
276                        ),
277                        slot,
278                        ballot,
279                        value: payload.clone(),
280                    },
281                ));
282            }
283            p2as
284        }))
285        .send_bincode_anonymous(acceptors);
286
287    let (a_log, a_to_proxy_leaders_p2b) = acceptor_p2(
288        acceptor_tick,
289        a_max_ballot.clone(),
290        pl_to_acceptors_p2a_thrifty,
291        a_checkpoint,
292        proxy_leaders,
293    );
294
295    // TODO: This is a liveness problem if any node in the thrifty quorum fails
296    // Need special operator for per-value timeout detection
297    let (quorums, fails) = collect_quorum(
298        a_to_proxy_leaders_p2b.atomic(proxy_leader_tick),
299        config.acceptor_grid_cols,
300        config.acceptor_grid_cols,
301    );
302
303    let pl_to_replicas = join_responses(proxy_leader_tick, quorums.map(q!(|k| (k, ()))), unsafe {
304        p_to_proxy_leaders_p2a.tick_batch(proxy_leader_tick)
305    });
306
307    let pl_failed_p2b_to_proposer = fails
308        .map(q!(|(_, ballot)| (ballot.proposer_id, ballot)))
309        .inspect(q!(|(_, ballot)| println!("Failed P2b: {:?}", ballot)))
310        .send_bincode_anonymous(proposers);
311
312    (
313        pl_to_replicas
314            .map(q!(|((slot, _ballot), (value, _))| (slot, value)))
315            .end_atomic(),
316        a_log,
317        pl_failed_p2b_to_proposer,
318    )
319}