hydro_test/cluster/
paxos.rs

1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::hash::Hash;
4use std::time::Duration;
5
6use hydro_lang::stream::AtLeastOnce;
7use hydro_lang::*;
8use hydro_std::quorum::{collect_quorum, collect_quorum_with_response};
9use hydro_std::request_response::join_responses;
10use serde::de::DeserializeOwned;
11use serde::{Deserialize, Serialize};
12
13use super::paxos_with_client::PaxosLike;
14
15#[derive(Serialize, Deserialize, Clone)]
16pub struct Proposer {}
17pub struct Acceptor {}
18
19#[derive(Clone, Copy)]
20pub struct PaxosConfig {
21    /// Maximum number of faulty nodes
22    pub f: usize,
23    /// How often to send "I am leader" heartbeats
24    pub i_am_leader_send_timeout: u64,
25    /// How often to check if the leader has expired
26    pub i_am_leader_check_timeout: u64,
27    /// Initial delay, multiplied by proposer pid, to stagger proposers checking for timeouts
28    pub i_am_leader_check_timeout_delay_multiplier: usize,
29}
30
31pub trait PaxosPayload: Serialize + DeserializeOwned + PartialEq + Eq + Clone + Debug {}
32impl<T: Serialize + DeserializeOwned + PartialEq + Eq + Clone + Debug> PaxosPayload for T {}
33
34#[derive(Serialize, Deserialize, PartialEq, Eq, Copy, Clone, Debug, Hash)]
35pub struct Ballot {
36    pub num: u32,
37    pub proposer_id: MemberId<Proposer>,
38}
39
40impl Ord for Ballot {
41    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
42        self.num
43            .cmp(&other.num)
44            .then_with(|| self.proposer_id.raw_id.cmp(&other.proposer_id.raw_id))
45    }
46}
47
48impl PartialOrd for Ballot {
49    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
50        Some(self.cmp(other))
51    }
52}
53
54#[derive(Serialize, Deserialize, Clone, Debug)]
55pub struct LogValue<P> {
56    pub ballot: Ballot,
57    pub value: Option<P>, // might be a hole
58}
59
60#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
61pub struct P2a<P, S> {
62    pub sender: MemberId<S>,
63    pub ballot: Ballot,
64    pub slot: usize,
65    pub value: Option<P>, // might be a re-committed hole
66}
67
68pub struct CorePaxos<'a> {
69    pub proposers: Cluster<'a, Proposer>,
70    pub acceptors: Cluster<'a, Acceptor>,
71    pub paxos_config: PaxosConfig,
72}
73
74impl<'a> PaxosLike<'a> for CorePaxos<'a> {
75    type PaxosIn = Proposer;
76    type PaxosLog = Acceptor;
77    type PaxosOut = Proposer;
78    type Ballot = Ballot;
79
80    fn payload_recipients(&self) -> &Cluster<'a, Self::PaxosIn> {
81        &self.proposers
82    }
83
84    fn log_stores(&self) -> &Cluster<'a, Self::PaxosLog> {
85        &self.acceptors
86    }
87
88    fn get_recipient_from_ballot<L: Location<'a>>(
89        ballot: Optional<Self::Ballot, L, Unbounded>,
90    ) -> Optional<MemberId<Self::PaxosIn>, L, Unbounded> {
91        ballot.map(q!(|ballot| ballot.proposer_id))
92    }
93
94    fn build<P: PaxosPayload>(
95        self,
96        with_ballot: impl FnOnce(
97            Stream<Ballot, Cluster<'a, Self::PaxosIn>, Unbounded>,
98        ) -> Stream<P, Cluster<'a, Self::PaxosIn>, Unbounded>,
99        a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
100        nondet_leader: NonDet,
101        nondet_commit: NonDet,
102    ) -> Stream<(usize, Option<P>), Cluster<'a, Self::PaxosOut>, Unbounded, NoOrder> {
103        paxos_core(
104            &self.proposers,
105            &self.acceptors,
106            a_checkpoint,
107            with_ballot,
108            self.paxos_config,
109            nondet_leader,
110            nondet_commit,
111        )
112        .1
113    }
114}
115
116/// Implements the core Paxos algorithm, which uses a cluster of propsers and acceptors
117/// to sequence payloads being sent to the proposers.
118///
119/// Proposers that currently are the leader will work with acceptors to sequence incoming
120/// payloads, but may drop payloads if they are not the lader or lose leadership during
121/// the commit process.
122///
123/// Returns a stream of ballots, where new values are emitted when a new leader is elected,
124/// and a stream of sequenced payloads with an index and optional payload (in the case of
125/// holes in the log).
126///
127/// # Non-Determinism
128/// When the leader is stable, the algorithm will commit incoming payloads to the leader
129/// in deterministic order. However, when the leader is changing, payloads may be
130/// non-deterministically dropped. The stream of ballots is also non-deterministic because
131/// leaders are elected in a non-deterministic process.
132#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
133pub fn paxos_core<'a, P: PaxosPayload>(
134    proposers: &Cluster<'a, Proposer>,
135    acceptors: &Cluster<'a, Acceptor>,
136    a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
137    c_to_proposers: impl FnOnce(
138        Stream<Ballot, Cluster<'a, Proposer>, Unbounded>,
139    ) -> Stream<P, Cluster<'a, Proposer>, Unbounded>,
140    config: PaxosConfig,
141    nondet_leader: NonDet,
142    nondet_commit: NonDet,
143) -> (
144    Stream<Ballot, Cluster<'a, Proposer>, Unbounded>,
145    Stream<(usize, Option<P>), Cluster<'a, Proposer>, Unbounded, NoOrder>,
146) {
147    let f = config.f;
148
149    proposers
150        .source_iter(q!(["Proposers say hello"]))
151        .for_each(q!(|s| println!("{}", s)));
152
153    acceptors
154        .source_iter(q!(["Acceptors say hello"]))
155        .for_each(q!(|s| println!("{}", s)));
156
157    let proposer_tick = proposers.tick();
158    let acceptor_tick = acceptors.tick();
159
160    let (sequencing_max_ballot_complete_cycle, sequencing_max_ballot_forward_reference) =
161        proposers.forward_ref::<Stream<Ballot, _, _, NoOrder>>();
162    let (a_log_complete_cycle, a_log_forward_reference) =
163        acceptor_tick.forward_ref::<Singleton<_, _, _>>();
164
165    let (p_ballot, p_is_leader, p_relevant_p1bs, a_max_ballot) = leader_election(
166        proposers,
167        acceptors,
168        &proposer_tick,
169        &acceptor_tick,
170        f + 1,
171        2 * f + 1,
172        config,
173        sequencing_max_ballot_forward_reference,
174        a_log_forward_reference,
175        nondet!(
176            /// The primary non-determinism exposed by leader election algorithm lies in which leader
177            /// is elected, which affects both the ballot at each proposer and the leader flag. But using a stale ballot
178            /// or leader flag will only lead to failure in sequencing rather than commiting the wrong value.
179            nondet_leader
180        ),
181        nondet!(
182            /// Because ballots are non-deterministic, the acceptor max ballot is also non-deterministic, although we are
183            /// guaranteed that the max ballot will match the current ballot of a proposer who believes they are the leader.
184            nondet_commit
185        ),
186    );
187
188    let just_became_leader = p_is_leader
189        .clone()
190        .continue_unless(p_is_leader.clone().defer_tick());
191
192    let c_to_proposers = c_to_proposers(
193        just_became_leader
194            .clone()
195            .then(p_ballot.clone())
196            .all_ticks(),
197    );
198
199    let (p_to_replicas, a_log, sequencing_max_ballots) = sequence_payload(
200        proposers,
201        acceptors,
202        &proposer_tick,
203        &acceptor_tick,
204        c_to_proposers,
205        a_checkpoint,
206        p_ballot.clone(),
207        p_is_leader,
208        p_relevant_p1bs,
209        f,
210        a_max_ballot,
211        nondet!(
212            /// The relevant p1bs are non-deterministic because they come from a arbitrary quorum, but because
213            /// we use a quorum, if we remain the leader there are no missing committed values when we combine the logs.
214            /// The remaining non-determinism is in when incoming payloads are batched versus the leader flag and state
215            /// of acceptors, which in the worst case will lead to dropped payloads as documented.
216            nondet_commit
217        ),
218    );
219
220    a_log_complete_cycle.complete(a_log.snapshot(nondet!(
221        /// We will always write payloads to the log before acknowledging them to the proposers,
222        /// which guarantees that if the leader changes the quorum overlap between sequencing and leader
223        /// election will include the committed value.
224    )));
225    sequencing_max_ballot_complete_cycle.complete(sequencing_max_ballots);
226
227    (
228        // Only tell the clients once when leader election concludes
229        just_became_leader.then(p_ballot).all_ticks(),
230        p_to_replicas,
231    )
232}
233
234#[expect(
235    clippy::type_complexity,
236    clippy::too_many_arguments,
237    reason = "internal paxos code // TODO"
238)]
239pub fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>(
240    proposers: &Cluster<'a, Proposer>,
241    acceptors: &Cluster<'a, Acceptor>,
242    proposer_tick: &Tick<Cluster<'a, Proposer>>,
243    acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
244    quorum_size: usize,
245    num_quorum_participants: usize,
246    paxos_config: PaxosConfig,
247    p_received_p2b_ballots: Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder>,
248    a_log: Singleton<(Option<usize>, L), Tick<Cluster<'a, Acceptor>>, Bounded>,
249    nondet_leader: NonDet,
250    nondet_acceptor_ballot: NonDet,
251) -> (
252    Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
253    Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
254    Stream<(Option<usize>, L), Tick<Cluster<'a, Proposer>>, Bounded, NoOrder>,
255    Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
256) {
257    let (p1b_fail_complete, p1b_fail) =
258        proposers.forward_ref::<Stream<Ballot, _, Unbounded, NoOrder>>();
259    let (p_to_proposers_i_am_leader_complete_cycle, p_to_proposers_i_am_leader_forward_ref) =
260        proposers.forward_ref::<Stream<_, _, _, NoOrder, AtLeastOnce>>();
261    let (p_is_leader_complete_cycle, p_is_leader_forward_ref) =
262        proposer_tick.forward_ref::<Optional<(), _, _>>();
263    // a_to_proposers_p2b.clone().for_each(q!(|(_, p2b): (u32, P2b)| println!("Proposer received P2b: {:?}", p2b)));
264    // p_to_proposers_i_am_leader.clone().for_each(q!(|ballot: Ballot| println!("Proposer received I am leader: {:?}", ballot)));
265    // c_to_proposers.clone().for_each(q!(|payload: ClientPayload| println!("Client sent proposer payload: {:?}", payload)));
266
267    let p_received_max_ballot = p1b_fail
268        .interleave(p_received_p2b_ballots)
269        .interleave(p_to_proposers_i_am_leader_forward_ref)
270        .max()
271        .unwrap_or(proposers.singleton(q!(Ballot {
272            num: 0,
273            proposer_id: MemberId::from_raw(0)
274        })));
275
276    let (p_ballot, p_has_largest_ballot) = p_ballot_calc(
277        proposer_tick,
278        p_received_max_ballot.snapshot(
279            proposer_tick,
280            nondet!(
281                /// A stale max ballot might result in us failing to become the leader, but which proposer
282                /// becomes the leader is non-deterministic anyway.
283                nondet_leader
284            ),
285        ),
286    );
287
288    let (p_to_proposers_i_am_leader, p_trigger_election) = p_leader_heartbeat(
289        proposers,
290        proposer_tick,
291        p_is_leader_forward_ref,
292        p_ballot.clone(),
293        paxos_config,
294        nondet!(
295            /// Non-determinism in heartbeats may lead to additional leader election attempts, which
296            /// is propagated to the non-determinism of which leader is elected.
297            nondet_leader
298        ),
299    );
300
301    p_to_proposers_i_am_leader_complete_cycle.complete(p_to_proposers_i_am_leader);
302
303    let p_to_acceptors_p1a = p_trigger_election
304        .then(p_ballot.clone())
305        .all_ticks()
306        .inspect(q!(|_| println!("Proposer leader expired, sending P1a")))
307        .broadcast_bincode(acceptors, nondet!(/** TODO */))
308        .values();
309
310    let (a_max_ballot, a_to_proposers_p1b) = acceptor_p1(
311        acceptor_tick,
312        p_to_acceptors_p1a.batch(
313            acceptor_tick,
314            nondet!(
315                /// Non-deterministic batching may result in different payloads being rejected
316                /// by an acceptor if the payload is batched with another payload with larger ballot.
317                /// But as documented, payloads may be non-deterministically dropped during leader election.
318                nondet_acceptor_ballot
319            ),
320        ),
321        a_log,
322        proposers,
323    );
324
325    let (p_is_leader, p_accepted_values, fail_ballots) = p_p1b(
326        proposer_tick,
327        a_to_proposers_p1b.inspect(q!(|p1b| println!("Proposer received P1b: {:?}", p1b))),
328        p_ballot.clone(),
329        p_has_largest_ballot,
330        quorum_size,
331        num_quorum_participants,
332    );
333    p_is_leader_complete_cycle.complete(p_is_leader.clone());
334    p1b_fail_complete.complete(fail_ballots);
335
336    (p_ballot, p_is_leader, p_accepted_values, a_max_ballot)
337}
338
339// Proposer logic to calculate the next ballot number. Expects p_received_max_ballot, the largest ballot received so far. Outputs streams: ballot_num, and has_largest_ballot, which only contains a value if we have the largest ballot.
340#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
341fn p_ballot_calc<'a>(
342    proposer_tick: &Tick<Cluster<'a, Proposer>>,
343    p_received_max_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
344) -> (
345    Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
346    Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
347) {
348    let (p_ballot_num_complete_cycle, p_ballot_num) =
349        proposer_tick.cycle_with_initial(proposer_tick.singleton(q!(0)));
350
351    let p_new_ballot_num = p_received_max_ballot
352        .clone()
353        .zip(p_ballot_num.clone())
354        .map(q!(move |(received_max_ballot, ballot_num)| {
355            if received_max_ballot
356                > (Ballot {
357                    num: ballot_num,
358                    proposer_id: CLUSTER_SELF_ID,
359                })
360            {
361                received_max_ballot.num + 1
362            } else {
363                ballot_num
364            }
365        }));
366    p_ballot_num_complete_cycle.complete_next_tick(p_new_ballot_num);
367
368    let p_ballot = p_ballot_num.map(q!(move |num| Ballot {
369        num,
370        proposer_id: CLUSTER_SELF_ID
371    }));
372
373    let p_has_largest_ballot = p_received_max_ballot
374        .clone()
375        .zip(p_ballot.clone())
376        .filter(q!(
377            |(received_max_ballot, cur_ballot)| *received_max_ballot <= *cur_ballot
378        ))
379        .map(q!(|_| ()));
380
381    // End stable leader election
382    (p_ballot, p_has_largest_ballot)
383}
384
385#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
386fn p_leader_heartbeat<'a>(
387    proposers: &Cluster<'a, Proposer>,
388    proposer_tick: &Tick<Cluster<'a, Proposer>>,
389    p_is_leader: Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
390    p_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
391    paxos_config: PaxosConfig,
392    nondet_reelection: NonDet,
393) -> (
394    Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder, AtLeastOnce>,
395    Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
396) {
397    let i_am_leader_send_timeout = paxos_config.i_am_leader_send_timeout;
398    let i_am_leader_check_timeout = paxos_config.i_am_leader_check_timeout;
399    let i_am_leader_check_timeout_delay_multiplier =
400        paxos_config.i_am_leader_check_timeout_delay_multiplier;
401
402    let p_to_proposers_i_am_leader = p_is_leader
403        .clone()
404        .then(p_ballot)
405        .latest()
406        .sample_every(
407            q!(Duration::from_secs(i_am_leader_send_timeout)),
408            nondet!(
409                /// Delays in heartbeats may lead to leader election attempts even
410                /// if the leader is alive. This will result in the previous leader receiving
411                /// larger ballots from its peers and it will drop its leadership.
412                nondet_reelection
413            ),
414        )
415        .broadcast_bincode(proposers, nondet!(/** TODO */))
416        .values();
417
418    let p_leader_expired = p_to_proposers_i_am_leader
419        .clone()
420        .timeout(
421            q!(Duration::from_secs(i_am_leader_check_timeout)),
422            nondet!(
423                /// Delayed timeouts only affect which leader wins re-election. If the leadership flag
424                /// is gained after timeout correctly ignore the timeout. If the flag is lost after
425                /// timeout we correctly attempt to become the leader.
426                nondet_reelection
427            ),
428        )
429        .snapshot(proposer_tick, nondet!(/** absorbed into timeout */))
430        .continue_unless(p_is_leader);
431
432    // Add random delay depending on node ID so not everyone sends p1a at the same time
433    let p_trigger_election = p_leader_expired.continue_if(
434        proposers
435            .source_interval_delayed(
436                q!(Duration::from_secs(
437                    (CLUSTER_SELF_ID.raw_id * i_am_leader_check_timeout_delay_multiplier as u32)
438                        .into()
439                )),
440                q!(Duration::from_secs(i_am_leader_check_timeout)),
441                nondet!(
442                    /// If the leader 'un-expires' due to non-determinstic delay, we return
443                    /// to a stable leader state. If the leader remains expired, non-deterministic
444                    /// delay is propagated to the non-determinism of which leader is elected.
445                    nondet_reelection
446                ),
447            )
448            .batch(proposer_tick, nondet!(/** absorbed into interval */))
449            .first(),
450    );
451    (p_to_proposers_i_am_leader, p_trigger_election)
452}
453
454#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
455fn acceptor_p1<'a, L: Serialize + DeserializeOwned + Clone>(
456    acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
457    p_to_acceptors_p1a: Stream<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded, NoOrder>,
458    a_log: Singleton<(Option<usize>, L), Tick<Cluster<'a, Acceptor>>, Bounded>,
459    proposers: &Cluster<'a, Proposer>,
460) -> (
461    Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
462    Stream<(Ballot, Result<(Option<usize>, L), Ballot>), Cluster<'a, Proposer>, Unbounded, NoOrder>,
463) {
464    let a_max_ballot = p_to_acceptors_p1a
465        .clone()
466        .inspect(q!(|p1a| println!("Acceptor received P1a: {:?}", p1a)))
467        .persist()
468        .max()
469        .unwrap_or(acceptor_tick.singleton(q!(Ballot {
470            num: 0,
471            proposer_id: MemberId::from_raw(0)
472        })));
473
474    (
475        a_max_ballot.clone(),
476        p_to_acceptors_p1a
477            .cross_singleton(a_max_ballot)
478            .cross_singleton(a_log)
479            .map(q!(|((ballot, max_ballot), log)| (
480                ballot.proposer_id,
481                (
482                    ballot,
483                    if ballot == max_ballot {
484                        Ok(log)
485                    } else {
486                        Err(max_ballot)
487                    }
488                )
489            )))
490            .all_ticks()
491            .demux_bincode(proposers)
492            .values(),
493    )
494}
495
496// Proposer logic for processing p1bs, determining if the proposer is now the leader, which uncommitted messages to commit, what the maximum slot is in the p1bs, and which no-ops to commit to fill log holes.
497#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
498fn p_p1b<'a, P: Clone + Serialize + DeserializeOwned>(
499    proposer_tick: &Tick<Cluster<'a, Proposer>>,
500    a_to_proposers_p1b: Stream<
501        (Ballot, Result<(Option<usize>, P), Ballot>),
502        Cluster<'a, Proposer>,
503        Unbounded,
504        NoOrder,
505    >,
506    p_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
507    p_has_largest_ballot: Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
508    quorum_size: usize,
509    num_quorum_participants: usize,
510) -> (
511    Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
512    Stream<(Option<usize>, P), Tick<Cluster<'a, Proposer>>, Bounded, NoOrder>,
513    Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder>,
514) {
515    let (quorums, fails) = collect_quorum_with_response(
516        a_to_proposers_p1b.atomic(proposer_tick),
517        quorum_size,
518        num_quorum_participants,
519    );
520
521    let p_received_quorum_of_p1bs = quorums
522        .into_keyed()
523        .assume_ordering::<TotalOrder>(nondet!(
524            /// We use `flatten_unordered` later
525        ))
526        .fold(
527            q!(|| vec![]),
528            q!(|logs, log| {
529                logs.push(log);
530            }),
531        )
532        .snapshot(nondet!(/** see above */))
533        .entries()
534        .max_by_key(q!(|t| t.0))
535        .zip(p_ballot.clone())
536        .filter_map(q!(
537            move |((quorum_ballot, quorum_accepted), my_ballot)| if quorum_ballot == my_ballot {
538                Some(quorum_accepted)
539            } else {
540                None
541            }
542        ));
543
544    let p_is_leader = p_received_quorum_of_p1bs
545        .clone()
546        .map(q!(|_| ()))
547        .continue_if(p_has_largest_ballot.clone());
548
549    (
550        p_is_leader,
551        // we used an unordered accumulator, so flattened has no order
552        p_received_quorum_of_p1bs.flatten_unordered(),
553        fails.end_atomic().map(q!(|(_, ballot)| ballot)),
554    )
555}
556
557#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
558pub fn recommit_after_leader_election<'a, P: PaxosPayload>(
559    accepted_logs: Stream<
560        (Option<usize>, HashMap<usize, LogValue<P>>),
561        Tick<Cluster<'a, Proposer>>,
562        Bounded,
563        NoOrder,
564    >,
565    p_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
566    f: usize,
567) -> (
568    Stream<((usize, Ballot), Option<P>), Tick<Cluster<'a, Proposer>>, Bounded, NoOrder>,
569    Optional<usize, Tick<Cluster<'a, Proposer>>, Bounded>,
570) {
571    let p_p1b_max_checkpoint = accepted_logs
572        .clone()
573        .filter_map(q!(|(checkpoint, _log)| checkpoint))
574        .max()
575        .into_singleton();
576    let p_p1b_highest_entries_and_count = accepted_logs
577        .map(q!(|(_checkpoint, log)| log))
578        .flatten_unordered() // Convert HashMap log back to stream
579        .into_keyed()
580        .fold_commutative::<(usize, Option<LogValue<P>>), _, _>(q!(|| (0, None)), q!(|curr_entry, new_entry| {
581            if let Some(curr_entry_payload) = &mut curr_entry.1 {
582                let same_values = new_entry.value == curr_entry_payload.value;
583                let higher_ballot = new_entry.ballot > curr_entry_payload.ballot;
584                // Increment count if the values are the same
585                if same_values {
586                    curr_entry.0 += 1;
587                }
588                // Replace the ballot with the largest one
589                if higher_ballot {
590                    curr_entry_payload.ballot = new_entry.ballot;
591                    // Replace the value with the one from the largest ballot, if necessary
592                    if !same_values {
593                        curr_entry.0 = 1;
594                        curr_entry_payload.value = new_entry.value;
595                    }
596                }
597            } else {
598                *curr_entry = (1, Some(new_entry));
599            }
600        }))
601        .map(q!(|(count, entry)| (count, entry.unwrap())));
602    let p_log_to_try_commit = p_p1b_highest_entries_and_count
603        .clone()
604        .entries()
605        .cross_singleton(p_ballot.clone())
606        .cross_singleton(p_p1b_max_checkpoint.clone())
607        .filter_map(q!(move |(((slot, (count, entry)), ballot), checkpoint)| {
608            if count > f {
609                return None;
610            } else if let Some(checkpoint) = checkpoint
611                && slot <= checkpoint
612            {
613                return None;
614            }
615            Some(((slot, ballot), entry.value))
616        }));
617    let p_max_slot = p_p1b_highest_entries_and_count.clone().keys().max();
618    let p_proposed_slots = p_p1b_highest_entries_and_count.clone().keys();
619    let p_log_holes = p_max_slot
620        .clone()
621        .zip(p_p1b_max_checkpoint)
622        .flat_map_ordered(q!(|(max_slot, checkpoint)| {
623            if let Some(checkpoint) = checkpoint {
624                (checkpoint + 1)..max_slot
625            } else {
626                0..max_slot
627            }
628        }))
629        .filter_not_in(p_proposed_slots)
630        .cross_singleton(p_ballot.clone())
631        .map(q!(move |(slot, ballot)| ((slot, ballot), None)));
632
633    (p_log_to_try_commit.chain(p_log_holes), p_max_slot)
634}
635
636#[expect(
637    clippy::type_complexity,
638    clippy::too_many_arguments,
639    reason = "internal paxos code // TODO"
640)]
641fn sequence_payload<'a, P: PaxosPayload>(
642    proposers: &Cluster<'a, Proposer>,
643    acceptors: &Cluster<'a, Acceptor>,
644    proposer_tick: &Tick<Cluster<'a, Proposer>>,
645    acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
646    c_to_proposers: Stream<P, Cluster<'a, Proposer>, Unbounded>,
647    a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
648
649    p_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
650    p_is_leader: Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
651
652    p_relevant_p1bs: Stream<
653        (Option<usize>, HashMap<usize, LogValue<P>>),
654        Tick<Cluster<'a, Proposer>>,
655        Bounded,
656        NoOrder,
657    >,
658    f: usize,
659
660    a_max_ballot: Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
661
662    nondet_commit: NonDet,
663) -> (
664    Stream<(usize, Option<P>), Cluster<'a, Proposer>, Unbounded, NoOrder>,
665    Singleton<
666        (Option<usize>, HashMap<usize, LogValue<P>>),
667        Atomic<Cluster<'a, Acceptor>>,
668        Unbounded,
669    >,
670    Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder>,
671) {
672    let (p_log_to_recommit, p_max_slot) =
673        recommit_after_leader_election(p_relevant_p1bs, p_ballot.clone(), f);
674
675    let indexed_payloads = index_payloads(
676        proposer_tick,
677        p_max_slot,
678        c_to_proposers
679            .batch(
680                proposer_tick,
681                nondet!(
682                    /// We batch payloads so that we can compute the correct slot based on
683                    /// base slot. In the case of a leader re-election, the base slot is updated which
684                    /// affects the computed payload slots. This non-determinism can lead to non-determinism
685                    /// in which payloads are committed when the leader is changing, which is documented at
686                    /// the function level
687                    nondet_commit
688                ),
689            )
690            .continue_if(p_is_leader.clone()),
691    );
692
693    let payloads_to_send = indexed_payloads
694        .cross_singleton(p_ballot.clone())
695        .map(q!(|((slot, payload), ballot)| (
696            (slot, ballot),
697            Some(payload)
698        )))
699        .chain(p_log_to_recommit)
700        .continue_if(p_is_leader)
701        .all_ticks_atomic();
702
703    let (a_log, a_to_proposers_p2b) = acceptor_p2(
704        acceptor_tick,
705        a_max_ballot.clone(),
706        payloads_to_send
707            .clone()
708            .end_atomic()
709            .map(q!(move |((slot, ballot), value)| P2a {
710                sender: CLUSTER_SELF_ID,
711                ballot,
712                slot,
713                value
714            }))
715            .broadcast_bincode(acceptors, nondet!(/** TODO */))
716            .values(),
717        a_checkpoint,
718        proposers,
719    );
720
721    // TOOD: only persist if we are the leader
722    let (quorums, fails) =
723        collect_quorum(a_to_proposers_p2b.atomic(proposer_tick), f + 1, 2 * f + 1);
724
725    let p_to_replicas = join_responses(
726        proposer_tick,
727        quorums.map(q!(|k| (k, ()))),
728        payloads_to_send.batch(nondet!(
729            /// The metadata will always be generated before we get a quorum
730            /// because `payloads_to_send` is used to send the payloads to acceptors.
731        )),
732    );
733
734    (
735        p_to_replicas
736            .end_atomic()
737            .map(q!(|((slot, _ballot), (value, _))| (slot, value))),
738        a_log,
739        fails.end_atomic().map(q!(|(_, ballot)| ballot)),
740    )
741}
742
743// Proposer logic to send p2as, outputting the next slot and the p2as to send to acceptors.
744pub fn index_payloads<'a, P: PaxosPayload>(
745    proposer_tick: &Tick<Cluster<'a, Proposer>>,
746    p_max_slot: Optional<usize, Tick<Cluster<'a, Proposer>>, Bounded>,
747    c_to_proposers: Stream<P, Tick<Cluster<'a, Proposer>>, Bounded>,
748) -> Stream<(usize, P), Tick<Cluster<'a, Proposer>>, Bounded> {
749    let (p_next_slot_complete_cycle, p_next_slot) =
750        proposer_tick.cycle_with_initial::<Singleton<usize, _, _>>(proposer_tick.singleton(q!(0)));
751    let p_next_slot_after_reconciling_p1bs = p_max_slot.map(q!(|max_slot| max_slot + 1));
752
753    let base_slot = p_next_slot_after_reconciling_p1bs.unwrap_or(p_next_slot);
754
755    let p_indexed_payloads = c_to_proposers
756        .enumerate()
757        .cross_singleton(base_slot.clone())
758        .map(q!(|((index, payload), base_slot)| (
759            base_slot + index,
760            payload
761        )));
762
763    let p_num_payloads = p_indexed_payloads.clone().count();
764    let p_next_slot_after_sending_payloads =
765        p_num_payloads
766            .clone()
767            .zip(base_slot)
768            .map(q!(|(num_payloads, base_slot)| base_slot + num_payloads));
769
770    p_next_slot_complete_cycle.complete_next_tick(p_next_slot_after_sending_payloads);
771    p_indexed_payloads
772}
773
774#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
775pub fn acceptor_p2<'a, P: PaxosPayload, S: Clone>(
776    acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
777    a_max_ballot: Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
778    p_to_acceptors_p2a: Stream<P2a<P, S>, Cluster<'a, Acceptor>, Unbounded, NoOrder>,
779    a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
780    proposers: &Cluster<'a, S>,
781) -> (
782    Singleton<
783        (Option<usize>, HashMap<usize, LogValue<P>>),
784        Atomic<Cluster<'a, Acceptor>>,
785        Unbounded,
786    >,
787    Stream<((usize, Ballot), Result<(), Ballot>), Cluster<'a, S>, Unbounded, NoOrder>,
788) {
789    let p_to_acceptors_p2a_batch = p_to_acceptors_p2a.batch(
790        acceptor_tick,
791        nondet!(
792            /// We use batches to ensure that the log is updated before sending
793            /// a confirmation to the proposer. Because we use `persist()` on these
794            /// messages before folding into the log, non-deterministic batch boundaries
795            /// will not affect the eventual log state.
796        ),
797    );
798
799    let a_checkpoint = a_checkpoint.snapshot(
800        acceptor_tick,
801        nondet!(
802            /// We can arbitrarily snapshot the checkpoint sequence number,
803            /// since a delayed garbage collection does not affect correctness.
804        ),
805    );
806    // .inspect(q!(|min_seq| println!("Acceptor new checkpoint: {:?}", min_seq)));
807
808    let a_p2as_to_place_in_log = p_to_acceptors_p2a_batch
809        .clone()
810        .cross_singleton(a_max_ballot.clone()) // Don't consider p2as if the current ballot is higher
811        .filter_map(q!(|(p2a, max_ballot)|
812            if p2a.ballot >= max_ballot {
813                Some((p2a.slot, LogValue {
814                    ballot: p2a.ballot,
815                    value: p2a.value,
816                }))
817            } else {
818                None
819            }
820        ));
821    let a_log = a_p2as_to_place_in_log
822        .all_ticks_atomic()
823        .into_keyed()
824        .reduce_watermark_commutative(
825            a_checkpoint.clone(),
826            q!(|prev_entry, entry| {
827                // Insert p2a into the log if it has a higher ballot than what was there before
828                if entry.ballot > prev_entry.ballot {
829                    *prev_entry = LogValue {
830                        ballot: entry.ballot,
831                        value: entry.value,
832                    };
833                }
834            }),
835        );
836    let a_log_snapshot = a_log
837        .snapshot(nondet!(
838            /// We need to know the current state of the log for p1b
839            /// TODO(shadaj): this isn't a justification for correctness
840        ))
841        .entries()
842        .fold_commutative(
843            q!(|| HashMap::new()),
844            q!(|map, (slot, entry)| {
845                map.insert(slot, entry);
846            }),
847        );
848
849    let a_to_proposers_p2b = p_to_acceptors_p2a_batch
850        .cross_singleton(a_max_ballot)
851        .map(q!(|(p2a, max_ballot)| (
852            p2a.sender,
853            (
854                (p2a.slot, p2a.ballot),
855                if p2a.ballot == max_ballot {
856                    Ok(())
857                } else {
858                    Err(max_ballot)
859                }
860            )
861        )))
862        .all_ticks()
863        .demux_bincode(proposers)
864        .values();
865
866    (
867        a_checkpoint
868            .into_singleton()
869            .zip(a_log_snapshot)
870            .latest_atomic(),
871        a_to_proposers_p2b,
872    )
873}