hydro_test/cluster/
paxos.rs

1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::hash::Hash;
4use std::time::Duration;
5
6use hydro_lang::*;
7use hydro_std::quorum::{collect_quorum, collect_quorum_with_response};
8use hydro_std::request_response::join_responses;
9use serde::de::DeserializeOwned;
10use serde::{Deserialize, Serialize};
11
12use super::paxos_with_client::PaxosLike;
13
14#[derive(Serialize, Deserialize, Clone)]
15pub struct Proposer {}
16pub struct Acceptor {}
17
18#[derive(Clone, Copy)]
19pub struct PaxosConfig {
20    /// Maximum number of faulty nodes
21    pub f: usize,
22    /// How often to send "I am leader" heartbeats
23    pub i_am_leader_send_timeout: u64,
24    /// How often to check if the leader has expired
25    pub i_am_leader_check_timeout: u64,
26    /// Initial delay, multiplied by proposer pid, to stagger proposers checking for timeouts
27    pub i_am_leader_check_timeout_delay_multiplier: usize,
28}
29
30pub trait PaxosPayload: Serialize + DeserializeOwned + PartialEq + Eq + Clone + Debug {}
31impl<T: Serialize + DeserializeOwned + PartialEq + Eq + Clone + Debug> PaxosPayload for T {}
32
33#[derive(Serialize, Deserialize, PartialEq, Eq, Copy, Clone, Debug, Hash)]
34pub struct Ballot {
35    pub num: u32,
36    pub proposer_id: ClusterId<Proposer>,
37}
38
39impl Ord for Ballot {
40    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
41        self.num
42            .cmp(&other.num)
43            .then_with(|| self.proposer_id.raw_id.cmp(&other.proposer_id.raw_id))
44    }
45}
46
47impl PartialOrd for Ballot {
48    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
49        Some(self.cmp(other))
50    }
51}
52
53#[derive(Serialize, Deserialize, Clone, Debug)]
54pub struct LogValue<P> {
55    pub ballot: Ballot,
56    pub value: Option<P>, // might be a hole
57}
58
59#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
60pub struct P2a<P, S> {
61    pub sender: ClusterId<S>,
62    pub ballot: Ballot,
63    pub slot: usize,
64    pub value: Option<P>, // might be a re-committed hole
65}
66
67pub struct CorePaxos<'a> {
68    pub proposers: Cluster<'a, Proposer>,
69    pub acceptors: Cluster<'a, Acceptor>,
70    pub paxos_config: PaxosConfig,
71}
72
73impl<'a> PaxosLike<'a> for CorePaxos<'a> {
74    type PaxosIn = Proposer;
75    type PaxosLog = Acceptor;
76    type PaxosOut = Proposer;
77    type Ballot = Ballot;
78
79    fn payload_recipients(&self) -> &Cluster<'a, Self::PaxosIn> {
80        &self.proposers
81    }
82
83    fn log_stores(&self) -> &Cluster<'a, Self::PaxosLog> {
84        &self.acceptors
85    }
86
87    fn get_recipient_from_ballot<L: Location<'a>>(
88        ballot: Optional<Self::Ballot, L, Unbounded>,
89    ) -> Optional<ClusterId<Self::PaxosIn>, L, Unbounded> {
90        ballot.map(q!(|ballot| ballot.proposer_id))
91    }
92
93    unsafe fn build<P: PaxosPayload>(
94        self,
95        with_ballot: impl FnOnce(
96            Stream<Ballot, Cluster<'a, Self::PaxosIn>, Unbounded>,
97        ) -> Stream<P, Cluster<'a, Self::PaxosIn>, Unbounded>,
98        a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
99    ) -> Stream<(usize, Option<P>), Cluster<'a, Self::PaxosOut>, Unbounded, NoOrder> {
100        unsafe {
101            paxos_core(
102                &self.proposers,
103                &self.acceptors,
104                a_checkpoint,
105                with_ballot,
106                self.paxos_config,
107            )
108            .1
109        }
110    }
111}
112
113/// Implements the core Paxos algorithm, which uses a cluster of propsers and acceptors
114/// to sequence payloads being sent to the proposers.
115///
116/// Proposers that currently are the leader will work with acceptors to sequence incoming
117/// payloads, but may drop payloads if they are not the lader or lose leadership during
118/// the commit process.
119///
120/// Returns a stream of ballots, where new values are emitted when a new leader is elected,
121/// and a stream of sequenced payloads with an index and optional payload (in the case of
122/// holes in the log).
123///
124/// # Safety
125/// When the leader is stable, the algorithm will commit incoming payloads to the leader
126/// in deterministic order. However, when the leader is changing, payloads may be
127/// non-deterministically dropped. The stream of ballots is also non-deterministic because
128/// leaders are elected in a non-deterministic process.
129#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
130pub unsafe fn paxos_core<'a, P: PaxosPayload>(
131    proposers: &Cluster<'a, Proposer>,
132    acceptors: &Cluster<'a, Acceptor>,
133    a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
134    c_to_proposers: impl FnOnce(
135        Stream<Ballot, Cluster<'a, Proposer>, Unbounded>,
136    ) -> Stream<P, Cluster<'a, Proposer>, Unbounded>,
137    config: PaxosConfig,
138) -> (
139    Stream<Ballot, Cluster<'a, Proposer>, Unbounded>,
140    Stream<(usize, Option<P>), Cluster<'a, Proposer>, Unbounded, NoOrder>,
141) {
142    let f = config.f;
143
144    proposers
145        .source_iter(q!(["Proposers say hello"]))
146        .for_each(q!(|s| println!("{}", s)));
147
148    acceptors
149        .source_iter(q!(["Acceptors say hello"]))
150        .for_each(q!(|s| println!("{}", s)));
151
152    let proposer_tick = proposers.tick();
153    let acceptor_tick = acceptors.tick();
154
155    let (sequencing_max_ballot_complete_cycle, sequencing_max_ballot_forward_reference) =
156        proposers.forward_ref::<Stream<Ballot, _, _, NoOrder>>();
157    let (a_log_complete_cycle, a_log_forward_reference) =
158        acceptor_tick.forward_ref::<Singleton<_, _, _>>();
159
160    let (p_ballot, p_is_leader, p_relevant_p1bs, a_max_ballot) = unsafe {
161        // SAFETY: The primary non-determinism exposed by leader election algorithm lies in which leader
162        // is elected, which affects both the ballot at each proposer and the leader flag. But using a stale ballot
163        // or leader flag will only lead to failure in sequencing rather than commiting the wrong value. Because
164        // ballots are non-deterministic, the acceptor max ballot is also non-deterministic, although we are
165        // guaranteed that the max ballot will match the current ballot of a proposer who believes they are the leader.
166        leader_election(
167            proposers,
168            acceptors,
169            &proposer_tick,
170            &acceptor_tick,
171            f + 1,
172            2 * f + 1,
173            config,
174            sequencing_max_ballot_forward_reference,
175            a_log_forward_reference,
176        )
177    };
178
179    let just_became_leader = p_is_leader
180        .clone()
181        .continue_unless(p_is_leader.clone().defer_tick());
182
183    let c_to_proposers = c_to_proposers(
184        just_became_leader
185            .clone()
186            .then(p_ballot.clone())
187            .all_ticks(),
188    );
189
190    let (p_to_replicas, a_log, sequencing_max_ballots) = unsafe {
191        // SAFETY: The relevant p1bs are non-deterministic because they come from a arbitrary quorum, but because
192        // we use a quorum, if we remain the leader there are no missing committed values when we combine the logs.
193        // The remaining non-determinism is in when incoming payloads are batched versus the leader flag and state
194        // of acceptors, which in the worst case will lead to dropped payloads as documented.
195        sequence_payload(
196            proposers,
197            acceptors,
198            &proposer_tick,
199            &acceptor_tick,
200            c_to_proposers,
201            a_checkpoint,
202            p_ballot.clone(),
203            p_is_leader,
204            p_relevant_p1bs,
205            f,
206            a_max_ballot,
207        )
208    };
209
210    a_log_complete_cycle.complete(unsafe {
211        // SAFETY: We will always write payloads to the log before acknowledging them to the proposers,
212        // which guarantees that if the leader changes the quorum overlap between sequencing and leader
213        // election will include the committed value.
214        a_log.latest_tick()
215    });
216    sequencing_max_ballot_complete_cycle.complete(sequencing_max_ballots);
217
218    (
219        // Only tell the clients once when leader election concludes
220        just_became_leader.then(p_ballot).all_ticks(),
221        p_to_replicas,
222    )
223}
224
225#[expect(
226    clippy::type_complexity,
227    clippy::too_many_arguments,
228    clippy::missing_safety_doc,
229    reason = "internal paxos code // TODO"
230)]
231pub unsafe fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>(
232    proposers: &Cluster<'a, Proposer>,
233    acceptors: &Cluster<'a, Acceptor>,
234    proposer_tick: &Tick<Cluster<'a, Proposer>>,
235    acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
236    quorum_size: usize,
237    num_quorum_participants: usize,
238    paxos_config: PaxosConfig,
239    p_received_p2b_ballots: Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder>,
240    a_log: Singleton<(Option<usize>, L), Tick<Cluster<'a, Acceptor>>, Bounded>,
241) -> (
242    Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
243    Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
244    Stream<(Option<usize>, L), Tick<Cluster<'a, Proposer>>, Bounded, NoOrder>,
245    Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
246) {
247    let (p1b_fail_complete, p1b_fail) =
248        proposers.forward_ref::<Stream<Ballot, _, Unbounded, NoOrder>>();
249    let (p_to_proposers_i_am_leader_complete_cycle, p_to_proposers_i_am_leader_forward_ref) =
250        proposers.forward_ref::<Stream<_, _, _, NoOrder>>();
251    let (p_is_leader_complete_cycle, p_is_leader_forward_ref) =
252        proposer_tick.forward_ref::<Optional<(), _, _>>();
253    // a_to_proposers_p2b.clone().for_each(q!(|(_, p2b): (u32, P2b)| println!("Proposer received P2b: {:?}", p2b)));
254    // p_to_proposers_i_am_leader.clone().for_each(q!(|ballot: Ballot| println!("Proposer received I am leader: {:?}", ballot)));
255    // c_to_proposers.clone().for_each(q!(|payload: ClientPayload| println!("Client sent proposer payload: {:?}", payload)));
256
257    let p_received_max_ballot = p1b_fail
258        .union(p_received_p2b_ballots)
259        .union(p_to_proposers_i_am_leader_forward_ref)
260        .max()
261        .unwrap_or(proposers.singleton(q!(Ballot {
262            num: 0,
263            proposer_id: ClusterId::from_raw(0)
264        })));
265
266    let (p_ballot, p_has_largest_ballot) = p_ballot_calc(proposer_tick, unsafe {
267        // SAFETY: A stale max ballot might result in us failing to become the leader, but which proposer
268        // becomes the leader is non-deterministic anyway.
269        p_received_max_ballot.latest_tick(proposer_tick)
270    });
271
272    let (p_to_proposers_i_am_leader, p_trigger_election) = unsafe {
273        // SAFETY: non-determinism in heartbeats may lead to additional leader election attempts, which
274        // is propagated to the non-determinism of which leader is elected.
275        p_leader_heartbeat(
276            proposers,
277            proposer_tick,
278            p_is_leader_forward_ref,
279            p_ballot.clone(),
280            paxos_config,
281        )
282    };
283
284    p_to_proposers_i_am_leader_complete_cycle.complete(p_to_proposers_i_am_leader);
285
286    let p_to_acceptors_p1a = p_trigger_election
287        .then(p_ballot.clone())
288        .all_ticks()
289        .inspect(q!(|_| println!("Proposer leader expired, sending P1a")))
290        .broadcast_bincode_anonymous(acceptors);
291
292    let (a_max_ballot, a_to_proposers_p1b) = acceptor_p1(
293        acceptor_tick,
294        unsafe {
295            // SAFETY: Non-deterministic batching may result in different payloads being rejected
296            // by an acceptor if the payload is batched with another payload with larger ballot.
297            // But as documented, payloads may be non-deterministically dropped during leader election.
298            p_to_acceptors_p1a.tick_batch(acceptor_tick)
299        },
300        a_log,
301        proposers,
302    );
303
304    let (p_is_leader, p_accepted_values, fail_ballots) = p_p1b(
305        proposer_tick,
306        a_to_proposers_p1b.inspect(q!(|p1b| println!("Proposer received P1b: {:?}", p1b))),
307        p_ballot.clone(),
308        p_has_largest_ballot,
309        quorum_size,
310        num_quorum_participants,
311    );
312    p_is_leader_complete_cycle.complete(p_is_leader.clone());
313    p1b_fail_complete.complete(fail_ballots.end_atomic());
314
315    (p_ballot, p_is_leader, p_accepted_values, a_max_ballot)
316}
317
318// Proposer logic to calculate the next ballot number. Expects p_received_max_ballot, the largest ballot received so far. Outputs streams: ballot_num, and has_largest_ballot, which only contains a value if we have the largest ballot.
319#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
320fn p_ballot_calc<'a>(
321    proposer_tick: &Tick<Cluster<'a, Proposer>>,
322    p_received_max_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
323) -> (
324    Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
325    Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
326) {
327    let (p_ballot_num_complete_cycle, p_ballot_num) =
328        proposer_tick.cycle_with_initial(proposer_tick.singleton(q!(0)));
329
330    let p_new_ballot_num = p_received_max_ballot
331        .clone()
332        .zip(p_ballot_num.clone())
333        .map(q!(move |(received_max_ballot, ballot_num)| {
334            if received_max_ballot
335                > (Ballot {
336                    num: ballot_num,
337                    proposer_id: CLUSTER_SELF_ID,
338                })
339            {
340                received_max_ballot.num + 1
341            } else {
342                ballot_num
343            }
344        }));
345    p_ballot_num_complete_cycle.complete_next_tick(p_new_ballot_num);
346
347    let p_ballot = p_ballot_num.map(q!(move |num| Ballot {
348        num,
349        proposer_id: CLUSTER_SELF_ID
350    }));
351
352    let p_has_largest_ballot = p_received_max_ballot
353        .clone()
354        .zip(p_ballot.clone())
355        .filter(q!(
356            |(received_max_ballot, cur_ballot)| *received_max_ballot <= *cur_ballot
357        ))
358        .map(q!(|_| ()));
359
360    // End stable leader election
361    (p_ballot, p_has_largest_ballot)
362}
363
364#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
365unsafe fn p_leader_heartbeat<'a>(
366    proposers: &Cluster<'a, Proposer>,
367    proposer_tick: &Tick<Cluster<'a, Proposer>>,
368    p_is_leader: Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
369    p_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
370    paxos_config: PaxosConfig,
371) -> (
372    Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder>,
373    Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
374) {
375    let i_am_leader_send_timeout = paxos_config.i_am_leader_send_timeout;
376    let i_am_leader_check_timeout = paxos_config.i_am_leader_check_timeout;
377    let i_am_leader_check_timeout_delay_multiplier =
378        paxos_config.i_am_leader_check_timeout_delay_multiplier;
379
380    let p_to_proposers_i_am_leader = unsafe {
381        // SAFETY: Delays in heartbeats may lead to leader election attempts even
382        // if the leader is alive. This will result in the previous leader receiving
383        // larger ballots from its peers and it will drop its leadership.
384        p_is_leader
385            .clone()
386            .then(p_ballot)
387            .latest()
388            .sample_every(q!(Duration::from_secs(i_am_leader_send_timeout)))
389    }
390    .broadcast_bincode_anonymous(proposers);
391
392    let p_leader_expired = unsafe {
393        // Delayed timeouts only affect which leader wins re-election. If the leadership flag
394        // is gained after timeout correctly ignore the timeout. If the flag is lost after
395        // timeout we correctly attempt to become the leader.
396        p_to_proposers_i_am_leader
397            .clone()
398            .timeout(q!(Duration::from_secs(i_am_leader_check_timeout)))
399            .latest_tick(proposer_tick)
400            .continue_unless(p_is_leader)
401    };
402
403    // Add random delay depending on node ID so not everyone sends p1a at the same time
404    let p_trigger_election = unsafe {
405        // SAFETY: If the leader "un-expires" due to non-determinstic delay, we return
406        // to a stable leader state. If the leader remains expired, non-deterministic
407        // delay is propagated to the non-determinism of which leader is elected.
408        p_leader_expired.continue_if(
409            proposers
410                .source_interval_delayed(
411                    q!(Duration::from_secs(
412                        (CLUSTER_SELF_ID.raw_id
413                            * i_am_leader_check_timeout_delay_multiplier as u32)
414                            .into()
415                    )),
416                    q!(Duration::from_secs(i_am_leader_check_timeout)),
417                )
418                .tick_batch(proposer_tick)
419                .first(),
420        )
421    };
422    (p_to_proposers_i_am_leader, p_trigger_election)
423}
424
425#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
426fn acceptor_p1<'a, L: Serialize + DeserializeOwned + Clone>(
427    acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
428    p_to_acceptors_p1a: Stream<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded, NoOrder>,
429    a_log: Singleton<(Option<usize>, L), Tick<Cluster<'a, Acceptor>>, Bounded>,
430    proposers: &Cluster<'a, Proposer>,
431) -> (
432    Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
433    Stream<(Ballot, Result<(Option<usize>, L), Ballot>), Cluster<'a, Proposer>, Unbounded, NoOrder>,
434) {
435    let a_max_ballot = p_to_acceptors_p1a
436        .clone()
437        .inspect(q!(|p1a| println!("Acceptor received P1a: {:?}", p1a)))
438        .persist()
439        .max()
440        .unwrap_or(acceptor_tick.singleton(q!(Ballot {
441            num: 0,
442            proposer_id: ClusterId::from_raw(0)
443        })));
444
445    (
446        a_max_ballot.clone(),
447        p_to_acceptors_p1a
448            .cross_singleton(a_max_ballot)
449            .cross_singleton(a_log)
450            .map(q!(|((ballot, max_ballot), log)| (
451                ballot.proposer_id,
452                (
453                    ballot,
454                    if ballot == max_ballot {
455                        Ok(log)
456                    } else {
457                        Err(max_ballot)
458                    }
459                )
460            )))
461            .all_ticks()
462            .send_bincode_anonymous(proposers),
463    )
464}
465
466// Proposer logic for processing p1bs, determining if the proposer is now the leader, which uncommitted messages to commit, what the maximum slot is in the p1bs, and which no-ops to commit to fill log holes.
467#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
468fn p_p1b<'a, P: Clone + Serialize + DeserializeOwned>(
469    proposer_tick: &Tick<Cluster<'a, Proposer>>,
470    a_to_proposers_p1b: Stream<
471        (Ballot, Result<(Option<usize>, P), Ballot>),
472        Cluster<'a, Proposer>,
473        Unbounded,
474        NoOrder,
475    >,
476    p_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
477    p_has_largest_ballot: Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
478    quorum_size: usize,
479    num_quorum_participants: usize,
480) -> (
481    Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
482    Stream<(Option<usize>, P), Tick<Cluster<'a, Proposer>>, Bounded, NoOrder>,
483    Stream<Ballot, Atomic<Cluster<'a, Proposer>>, Unbounded, NoOrder>,
484) {
485    let (quorums, fails) = collect_quorum_with_response(
486        a_to_proposers_p1b.atomic(proposer_tick),
487        quorum_size,
488        num_quorum_participants,
489    );
490
491    let p_received_quorum_of_p1bs = unsafe {
492        // SAFETY: All the values for a quorum will be emitted in a single batch,
493        // so we will not split up the quorum.
494        quorums.tick_batch()
495    }
496    .persist()
497    .fold_keyed_commutative(
498        q!(|| vec![]),
499        q!(|logs, log| {
500            // even though this is non-commutative, we use `flatten_unordered` later
501            logs.push(log);
502        }),
503    )
504    .max_by_key(q!(|t| t.0))
505    .zip(p_ballot.clone())
506    .filter_map(q!(
507        move |((quorum_ballot, quorum_accepted), my_ballot)| if quorum_ballot == my_ballot {
508            Some(quorum_accepted)
509        } else {
510            None
511        }
512    ));
513
514    let p_is_leader = p_received_quorum_of_p1bs
515        .clone()
516        .map(q!(|_| ()))
517        .continue_if(p_has_largest_ballot.clone());
518
519    (
520        p_is_leader,
521        // we used an unordered accumulator, so flattened has no order
522        p_received_quorum_of_p1bs.flatten_unordered(),
523        fails.map(q!(|(_, ballot)| ballot)),
524    )
525}
526
527#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
528pub fn recommit_after_leader_election<'a, P: PaxosPayload>(
529    accepted_logs: Stream<
530        (Option<usize>, HashMap<usize, LogValue<P>>),
531        Tick<Cluster<'a, Proposer>>,
532        Bounded,
533        NoOrder,
534    >,
535    p_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
536    f: usize,
537) -> (
538    Stream<((usize, Ballot), Option<P>), Tick<Cluster<'a, Proposer>>, Bounded, NoOrder>,
539    Optional<usize, Tick<Cluster<'a, Proposer>>, Bounded>,
540) {
541    let p_p1b_max_checkpoint = accepted_logs
542        .clone()
543        .filter_map(q!(|(checkpoint, _log)| checkpoint))
544        .max()
545        .into_singleton();
546    let p_p1b_highest_entries_and_count = accepted_logs
547        .map(q!(|(_checkpoint, log)| log))
548        .flatten_unordered() // Convert HashMap log back to stream
549        .fold_keyed_commutative::<(usize, Option<LogValue<P>>), _, _>(q!(|| (0, None)), q!(|curr_entry, new_entry| {
550            if let Some(curr_entry_payload) = &mut curr_entry.1 {
551                let same_values = new_entry.value == curr_entry_payload.value;
552                let higher_ballot = new_entry.ballot > curr_entry_payload.ballot;
553                // Increment count if the values are the same
554                if same_values {
555                    curr_entry.0 += 1;
556                }
557                // Replace the ballot with the largest one
558                if higher_ballot {
559                    curr_entry_payload.ballot = new_entry.ballot;
560                    // Replace the value with the one from the largest ballot, if necessary
561                    if !same_values {
562                        curr_entry.0 = 1;
563                        curr_entry_payload.value = new_entry.value;
564                    }
565                }
566            } else {
567                *curr_entry = (1, Some(new_entry));
568            }
569        }))
570        .map(q!(|(slot, (count, entry))| (slot, (count, entry.unwrap()))));
571    let p_log_to_try_commit = p_p1b_highest_entries_and_count
572        .clone()
573        .cross_singleton(p_ballot.clone())
574        .cross_singleton(p_p1b_max_checkpoint.clone())
575        .filter_map(q!(move |(((slot, (count, entry)), ballot), checkpoint)| {
576            if count > f {
577                return None;
578            } else if let Some(checkpoint) = checkpoint {
579                if slot <= checkpoint {
580                    return None;
581                }
582            }
583            Some(((slot, ballot), entry.value))
584        }));
585    let p_max_slot = p_p1b_highest_entries_and_count
586        .clone()
587        .map(q!(|(slot, _)| slot))
588        .max();
589    let p_proposed_slots = p_p1b_highest_entries_and_count
590        .clone()
591        .map(q!(|(slot, _)| slot));
592    let p_log_holes = p_max_slot
593        .clone()
594        .zip(p_p1b_max_checkpoint)
595        .flat_map_ordered(q!(|(max_slot, checkpoint)| {
596            if let Some(checkpoint) = checkpoint {
597                (checkpoint + 1)..max_slot
598            } else {
599                0..max_slot
600            }
601        }))
602        .filter_not_in(p_proposed_slots)
603        .cross_singleton(p_ballot.clone())
604        .map(q!(move |(slot, ballot)| ((slot, ballot), None)));
605
606    (p_log_to_try_commit.chain(p_log_holes), p_max_slot)
607}
608
609#[expect(
610    clippy::type_complexity,
611    clippy::too_many_arguments,
612    reason = "internal paxos code // TODO"
613)]
614unsafe fn sequence_payload<'a, P: PaxosPayload>(
615    proposers: &Cluster<'a, Proposer>,
616    acceptors: &Cluster<'a, Acceptor>,
617    proposer_tick: &Tick<Cluster<'a, Proposer>>,
618    acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
619    c_to_proposers: Stream<P, Cluster<'a, Proposer>, Unbounded>,
620    a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
621
622    p_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
623    p_is_leader: Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
624
625    p_relevant_p1bs: Stream<
626        (Option<usize>, HashMap<usize, LogValue<P>>),
627        Tick<Cluster<'a, Proposer>>,
628        Bounded,
629        NoOrder,
630    >,
631    f: usize,
632
633    a_max_ballot: Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
634) -> (
635    Stream<(usize, Option<P>), Cluster<'a, Proposer>, Unbounded, NoOrder>,
636    Singleton<
637        (Option<usize>, HashMap<usize, LogValue<P>>),
638        Atomic<Cluster<'a, Acceptor>>,
639        Unbounded,
640    >,
641    Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder>,
642) {
643    let (p_log_to_recommit, p_max_slot) =
644        recommit_after_leader_election(p_relevant_p1bs, p_ballot.clone(), f);
645
646    let indexed_payloads = index_payloads(proposer_tick, p_max_slot, unsafe {
647        // SAFETY: We batch payloads so that we can compute the correct slot based on
648        // base slot. In the case of a leader re-election, the base slot is updated which
649        // affects the computed payload slots. This non-determinism can lead to non-determinism
650        // in which payloads are committed when the leader is changing, which is documented at
651        // the function level.
652        c_to_proposers
653            .tick_batch(proposer_tick)
654            .continue_if(p_is_leader.clone())
655    });
656
657    let payloads_to_send = indexed_payloads
658        .cross_singleton(p_ballot.clone())
659        .map(q!(|((slot, payload), ballot)| (
660            (slot, ballot),
661            Some(payload)
662        )))
663        .chain(p_log_to_recommit)
664        .continue_if(p_is_leader)
665        .all_ticks_atomic();
666
667    let (a_log, a_to_proposers_p2b) = acceptor_p2(
668        acceptor_tick,
669        a_max_ballot.clone(),
670        payloads_to_send
671            .clone()
672            .map(q!(move |((slot, ballot), value)| P2a {
673                sender: CLUSTER_SELF_ID,
674                ballot,
675                slot,
676                value
677            }))
678            .broadcast_bincode_anonymous(acceptors),
679        a_checkpoint,
680        proposers,
681    );
682
683    // TOOD: only persist if we are the leader
684    let (quorums, fails) =
685        collect_quorum(a_to_proposers_p2b.atomic(proposer_tick), f + 1, 2 * f + 1);
686
687    let p_to_replicas = join_responses(proposer_tick, quorums.map(q!(|k| (k, ()))), unsafe {
688        // SAFETY: The metadata will always be generated before we get a quorum
689        // because `payloads_to_send` is used to send the payloads to acceptors.
690        payloads_to_send.tick_batch()
691    });
692
693    (
694        p_to_replicas
695            .map(q!(|((slot, _ballot), (value, _))| (slot, value)))
696            .end_atomic(),
697        a_log,
698        fails.map(q!(|(_, ballot)| ballot)).end_atomic(),
699    )
700}
701
702#[derive(Clone)]
703pub enum CheckpointOrP2a<P, S> {
704    Checkpoint(usize),
705    P2a(P2a<P, S>),
706}
707
708// Proposer logic to send p2as, outputting the next slot and the p2as to send to acceptors.
709pub fn index_payloads<'a, P: PaxosPayload>(
710    proposer_tick: &Tick<Cluster<'a, Proposer>>,
711    p_max_slot: Optional<usize, Tick<Cluster<'a, Proposer>>, Bounded>,
712    c_to_proposers: Stream<P, Tick<Cluster<'a, Proposer>>, Bounded>,
713) -> Stream<(usize, P), Tick<Cluster<'a, Proposer>>, Bounded> {
714    let (p_next_slot_complete_cycle, p_next_slot) =
715        proposer_tick.cycle_with_initial::<Singleton<usize, _, _>>(proposer_tick.singleton(q!(0)));
716    let p_next_slot_after_reconciling_p1bs = p_max_slot.map(q!(|max_slot| max_slot + 1));
717
718    let base_slot = p_next_slot_after_reconciling_p1bs.unwrap_or(p_next_slot);
719
720    let p_indexed_payloads = c_to_proposers
721        .enumerate()
722        .cross_singleton(base_slot.clone())
723        .map(q!(|((index, payload), base_slot)| (
724            base_slot + index,
725            payload
726        )));
727
728    let p_num_payloads = p_indexed_payloads.clone().count();
729    let p_next_slot_after_sending_payloads =
730        p_num_payloads
731            .clone()
732            .zip(base_slot)
733            .map(q!(|(num_payloads, base_slot)| base_slot + num_payloads));
734
735    p_next_slot_complete_cycle.complete_next_tick(p_next_slot_after_sending_payloads);
736    p_indexed_payloads
737}
738
739#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
740pub fn acceptor_p2<'a, P: PaxosPayload, S: Clone>(
741    acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
742    a_max_ballot: Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
743    p_to_acceptors_p2a: Stream<P2a<P, S>, Cluster<'a, Acceptor>, Unbounded, NoOrder>,
744    a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
745    proposers: &Cluster<'a, S>,
746) -> (
747    Singleton<
748        (Option<usize>, HashMap<usize, LogValue<P>>),
749        Atomic<Cluster<'a, Acceptor>>,
750        Unbounded,
751    >,
752    Stream<((usize, Ballot), Result<(), Ballot>), Cluster<'a, S>, Unbounded, NoOrder>,
753) {
754    let p_to_acceptors_p2a_batch = unsafe {
755        // SAFETY: we use batches to ensure that the log is updated before sending
756        // a confirmation to the proposer. Because we use `persist()` on these
757        // messages before folding into the log, non-deterministic batch boundaries
758        // will not affect the eventual log state.
759        p_to_acceptors_p2a.tick_batch(acceptor_tick)
760    };
761
762    let a_new_checkpoint = unsafe {
763        // SAFETY: we can arbitrarily snapshot the checkpoint sequence number,
764        // since a delayed garbage collection does not affect correctness
765        a_checkpoint.latest_tick(acceptor_tick)
766    }
767    .delta()
768    .map(q!(|min_seq| CheckpointOrP2a::Checkpoint(min_seq)));
769    // .inspect(q!(|(min_seq, p2a): &(i32, P2a)| println!("Acceptor new checkpoint: {:?}", min_seq)));
770
771    let a_p2as_to_place_in_log = p_to_acceptors_p2a_batch
772        .clone()
773        .cross_singleton(a_max_ballot.clone()) // Don't consider p2as if the current ballot is higher
774        .filter_map(q!(|(p2a, max_ballot)|
775            if p2a.ballot >= max_ballot {
776                Some(CheckpointOrP2a::P2a(p2a))
777            } else {
778                None
779            }
780        ));
781    let a_log = a_p2as_to_place_in_log
782        .chain(a_new_checkpoint.into_stream())
783        .all_ticks_atomic()
784        .fold_commutative(
785            q!(|| (None, HashMap::new())),
786            q!(|(prev_checkpoint, log), checkpoint_or_p2a| {
787                match checkpoint_or_p2a {
788                    CheckpointOrP2a::Checkpoint(new_checkpoint) => {
789                        if prev_checkpoint
790                            .map(|prev| new_checkpoint > prev)
791                            .unwrap_or(true)
792                        {
793                            for slot in (prev_checkpoint.unwrap_or(0))..new_checkpoint {
794                                log.remove(&slot);
795                            }
796
797                            *prev_checkpoint = Some(new_checkpoint);
798                        }
799                    }
800                    CheckpointOrP2a::P2a(p2a) => {
801                        // This is a regular p2a message. Insert it into the log if it is not checkpointed and has a higher ballot than what was there before
802                        if prev_checkpoint.map(|prev| p2a.slot > prev).unwrap_or(true)
803                            && log
804                                .get(&p2a.slot)
805                                .map(|prev_p2a: &LogValue<_>| p2a.ballot > prev_p2a.ballot)
806                                .unwrap_or(true)
807                        {
808                            log.insert(
809                                p2a.slot,
810                                LogValue {
811                                    ballot: p2a.ballot,
812                                    value: p2a.value,
813                                },
814                            );
815                        }
816                    }
817                }
818            }),
819        );
820
821    let a_to_proposers_p2b = p_to_acceptors_p2a_batch
822        .cross_singleton(a_max_ballot)
823        .map(q!(|(p2a, max_ballot)| (
824            p2a.sender,
825            (
826                (p2a.slot, p2a.ballot),
827                if p2a.ballot == max_ballot {
828                    Ok(())
829                } else {
830                    Err(max_ballot)
831                }
832            )
833        )))
834        .all_ticks()
835        .send_bincode_anonymous(proposers);
836    (a_log, a_to_proposers_p2b)
837}