Skip to main content

hydro_test/cluster/
paxos.rs

1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::hash::Hash;
4use std::time::Duration;
5
6use hydro_lang::live_collections::sliced::yield_atomic;
7use hydro_lang::live_collections::stream::{AtLeastOnce, NoOrder, TotalOrder};
8use hydro_lang::location::cluster::CLUSTER_SELF_ID;
9use hydro_lang::location::{Atomic, Location, MemberId, NoTick};
10use hydro_lang::prelude::*;
11use hydro_std::quorum::{collect_quorum, collect_quorum_with_response};
12use hydro_std::request_response::join_responses;
13use serde::de::DeserializeOwned;
14use serde::{Deserialize, Serialize};
15
16use super::paxos_with_client::PaxosLike;
17
18#[derive(Serialize, Deserialize, Clone)]
19pub struct Proposer {}
20pub struct Acceptor {}
21
22#[derive(Clone, Copy)]
23pub struct PaxosConfig {
24    /// Maximum number of faulty nodes
25    pub f: usize,
26    /// How often to send "I am leader" heartbeats
27    pub i_am_leader_send_timeout: u64,
28    /// How often to check if the leader has expired
29    pub i_am_leader_check_timeout: u64,
30    /// Initial delay, multiplied by proposer pid, to stagger proposers checking for timeouts
31    pub i_am_leader_check_timeout_delay_multiplier: usize,
32}
33
34pub trait PaxosPayload: Serialize + DeserializeOwned + PartialEq + Eq + Clone + Debug {}
35impl<T: Serialize + DeserializeOwned + PartialEq + Eq + Clone + Debug> PaxosPayload for T {}
36
37#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug, Hash)]
38pub struct Ballot {
39    pub num: u32,
40    pub proposer_id: MemberId<Proposer>,
41}
42
43impl Ord for Ballot {
44    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
45        self.num
46            .cmp(&other.num)
47            .then_with(|| self.proposer_id.cmp(&other.proposer_id))
48    }
49}
50
51impl PartialOrd for Ballot {
52    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
53        Some(self.cmp(other))
54    }
55}
56
57#[derive(Serialize, Deserialize, Clone, Debug)]
58pub struct LogValue<P> {
59    pub ballot: Ballot,
60    pub value: Option<P>, // might be a hole
61}
62
63#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
64pub struct P2a<P, S> {
65    pub sender: MemberId<S>,
66    pub ballot: Ballot,
67    pub slot: usize,
68    pub value: Option<P>, // might be a re-committed hole
69}
70
71pub struct CorePaxos<'a> {
72    pub proposers: Cluster<'a, Proposer>,
73    pub acceptors: Cluster<'a, Acceptor>,
74    pub paxos_config: PaxosConfig,
75}
76
77impl<'a> PaxosLike<'a> for CorePaxos<'a> {
78    type PaxosIn = Proposer;
79    type PaxosLog = Acceptor;
80    type PaxosOut = Proposer;
81    type Ballot = Ballot;
82
83    fn payload_recipients(&self) -> &Cluster<'a, Self::PaxosIn> {
84        &self.proposers
85    }
86
87    fn log_stores(&self) -> &Cluster<'a, Self::PaxosLog> {
88        &self.acceptors
89    }
90
91    fn get_recipient_from_ballot<L: Location<'a>>(
92        ballot: Optional<Self::Ballot, L, Unbounded>,
93    ) -> Optional<MemberId<Self::PaxosIn>, L, Unbounded> {
94        ballot.map(q!(|ballot| ballot.proposer_id))
95    }
96
97    fn build<P: PaxosPayload>(
98        self,
99        with_ballot: impl FnOnce(
100            Stream<Ballot, Cluster<'a, Self::PaxosIn>, Unbounded>,
101        ) -> Stream<P, Cluster<'a, Self::PaxosIn>, Unbounded>,
102        a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
103        nondet_leader: NonDet,
104        nondet_commit: NonDet,
105    ) -> Stream<(usize, Option<P>), Cluster<'a, Self::PaxosOut>, Unbounded, NoOrder> {
106        paxos_core(
107            &self.proposers,
108            &self.acceptors,
109            a_checkpoint,
110            with_ballot,
111            self.paxos_config,
112            nondet_leader,
113            nondet_commit,
114        )
115        .1
116    }
117}
118
119/// Implements the core Paxos algorithm, which uses a cluster of propsers and acceptors
120/// to sequence payloads being sent to the proposers.
121///
122/// Proposers that currently are the leader will work with acceptors to sequence incoming
123/// payloads, but may drop payloads if they are not the lader or lose leadership during
124/// the commit process.
125///
126/// Returns a stream of ballots, where new values are emitted when a new leader is elected,
127/// and a stream of sequenced payloads with an index and optional payload (in the case of
128/// holes in the log).
129///
130/// # Non-Determinism
131/// When the leader is stable, the algorithm will commit incoming payloads to the leader
132/// in deterministic order. However, when the leader is changing, payloads may be
133/// non-deterministically dropped. The stream of ballots is also non-deterministic because
134/// leaders are elected in a non-deterministic process.
135#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
136pub fn paxos_core<'a, P: PaxosPayload>(
137    proposers: &Cluster<'a, Proposer>,
138    acceptors: &Cluster<'a, Acceptor>,
139    a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
140    c_to_proposers: impl FnOnce(
141        Stream<Ballot, Cluster<'a, Proposer>, Unbounded>,
142    ) -> Stream<P, Cluster<'a, Proposer>, Unbounded>,
143    config: PaxosConfig,
144    nondet_leader: NonDet,
145    nondet_commit: NonDet,
146) -> (
147    Stream<Ballot, Cluster<'a, Proposer>, Unbounded>,
148    Stream<(usize, Option<P>), Cluster<'a, Proposer>, Unbounded, NoOrder>,
149) {
150    let f = config.f;
151
152    proposers
153        .source_iter(q!(["Proposers say hello"]))
154        .for_each(q!(|s| println!("{}", s)));
155
156    acceptors
157        .source_iter(q!(["Acceptors say hello"]))
158        .for_each(q!(|s| println!("{}", s)));
159
160    let proposer_tick = proposers.tick();
161    let acceptor_tick = acceptors.tick();
162
163    let (sequencing_max_ballot_complete_cycle, sequencing_max_ballot_forward_reference) =
164        proposers.forward_ref::<Stream<Ballot, _, _, NoOrder>>();
165    let (a_log_complete_cycle, a_log_forward_reference) =
166        acceptor_tick.forward_ref::<Singleton<_, _, _>>();
167
168    let (p_ballot, p_is_leader, p_relevant_p1bs, a_max_ballot) = leader_election(
169        proposers,
170        acceptors,
171        &proposer_tick,
172        &acceptor_tick,
173        f + 1,
174        2 * f + 1,
175        config,
176        sequencing_max_ballot_forward_reference,
177        a_log_forward_reference,
178        nondet!(
179            /// The primary non-determinism exposed by leader election algorithm lies in which leader
180            /// is elected, which affects both the ballot at each proposer and the leader flag. But using a stale ballot
181            /// or leader flag will only lead to failure in sequencing rather than commiting the wrong value.
182            nondet_leader
183        ),
184        nondet!(
185            /// Because ballots are non-deterministic, the acceptor max ballot is also non-deterministic, although we are
186            /// guaranteed that the max ballot will match the current ballot of a proposer who believes they are the leader.
187            nondet_commit
188        ),
189    );
190
191    let was_not_leader = p_is_leader
192        .clone()
193        .map(q!(|is_leader| is_leader.then_some(())))
194        .into_optional()
195        .defer_tick()
196        .is_none();
197    let just_became_leader = p_is_leader.clone().and(was_not_leader);
198
199    let c_to_proposers = c_to_proposers(
200        p_ballot
201            .clone()
202            .filter_if(just_became_leader.clone())
203            .all_ticks(),
204    );
205
206    let (p_to_replicas, a_log, sequencing_max_ballots) = sequence_payload(
207        proposers,
208        acceptors,
209        &proposer_tick,
210        &acceptor_tick,
211        c_to_proposers,
212        a_checkpoint,
213        p_ballot.clone(),
214        p_is_leader,
215        p_relevant_p1bs,
216        f,
217        a_max_ballot,
218        nondet!(
219            /// The relevant p1bs are non-deterministic because they come from a arbitrary quorum, but because
220            /// we use a quorum, if we remain the leader there are no missing committed values when we combine the logs.
221            /// The remaining non-determinism is in when incoming payloads are batched versus the leader flag and state
222            /// of acceptors, which in the worst case will lead to dropped payloads as documented.
223            nondet_commit
224        ),
225    );
226
227    a_log_complete_cycle.complete(a_log.snapshot_atomic(
228        &acceptor_tick,
229        nondet!(
230            /// We will always write payloads to the log before acknowledging them to the proposers,
231            /// which guarantees that if the leader changes the quorum overlap between sequencing and leader
232            /// election will include the committed value.
233        ),
234    ));
235    sequencing_max_ballot_complete_cycle.complete(sequencing_max_ballots);
236
237    (
238        // Only tell the clients once when leader election concludes
239        p_ballot.filter_if(just_became_leader).all_ticks(),
240        p_to_replicas,
241    )
242}
243
244#[expect(
245    clippy::type_complexity,
246    clippy::too_many_arguments,
247    reason = "internal paxos code // TODO"
248)]
249pub fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>(
250    proposers: &Cluster<'a, Proposer>,
251    acceptors: &Cluster<'a, Acceptor>,
252    proposer_tick: &Tick<Cluster<'a, Proposer>>,
253    acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
254    quorum_size: usize,
255    num_quorum_participants: usize,
256    paxos_config: PaxosConfig,
257    p_received_p2b_ballots: Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder>,
258    a_log: Singleton<(Option<usize>, L), Tick<Cluster<'a, Acceptor>>, Bounded>,
259    nondet_leader: NonDet,
260    nondet_acceptor_ballot: NonDet,
261) -> (
262    Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
263    Singleton<bool, Tick<Cluster<'a, Proposer>>, Bounded>,
264    Stream<(Option<usize>, L), Tick<Cluster<'a, Proposer>>, Bounded, NoOrder>,
265    Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
266) {
267    let (p1b_fail_complete, p1b_fail) =
268        proposers.forward_ref::<Stream<Ballot, _, Unbounded, NoOrder>>();
269    let (p_to_proposers_i_am_leader_complete_cycle, p_to_proposers_i_am_leader_forward_ref) =
270        proposers.forward_ref::<Stream<_, _, _, NoOrder, AtLeastOnce>>();
271    let (p_is_leader_complete_cycle, p_is_leader_forward_ref) =
272        proposer_tick.forward_ref::<Singleton<bool, _, _>>();
273    // a_to_proposers_p2b.clone().for_each(q!(|(_, p2b): (u32, P2b)| println!("Proposer received P2b: {:?}", p2b)));
274    // p_to_proposers_i_am_leader.clone().for_each(q!(|ballot: Ballot| println!("Proposer received I am leader: {:?}", ballot)));
275    // c_to_proposers.clone().for_each(q!(|payload: ClientPayload| println!("Client sent proposer payload: {:?}", payload)));
276
277    let p_received_max_ballot = p1b_fail
278        .merge_unordered(p_received_p2b_ballots)
279        .merge_unordered(p_to_proposers_i_am_leader_forward_ref)
280        .max()
281        .unwrap_or(
282            proposers
283                .singleton(q!(Ballot {
284                    num: 0,
285                    proposer_id: MemberId::from_raw_id(0)
286                }))
287                .into(),
288        );
289
290    let (p_ballot, p_has_largest_ballot) = p_ballot_calc(p_received_max_ballot.snapshot(
291        proposer_tick,
292        nondet!(
293            /// A stale max ballot might result in us failing to become the leader, but which proposer
294            /// becomes the leader is non-deterministic anyway.
295            nondet_leader
296        ),
297    ));
298
299    let (p_to_proposers_i_am_leader, p_trigger_election) = p_leader_heartbeat(
300        proposers,
301        proposer_tick,
302        p_is_leader_forward_ref,
303        p_ballot.clone(),
304        paxos_config,
305        nondet!(
306            /// Non-determinism in heartbeats may lead to additional leader election attempts, which
307            /// is propagated to the non-determinism of which leader is elected.
308            nondet_leader
309        ),
310    );
311
312    p_to_proposers_i_am_leader_complete_cycle.complete(p_to_proposers_i_am_leader);
313
314    let p_to_acceptors_p1a = p_ballot
315        .clone()
316        .filter_if(p_trigger_election)
317        .all_ticks()
318        .inspect(q!(|_| println!("Proposer leader expired, sending P1a")))
319        .broadcast(acceptors, TCP.fail_stop().bincode(), nondet!(/** TODO */))
320        .values();
321
322    let (a_max_ballot, a_to_proposers_p1b) = acceptor_p1(
323        acceptor_tick,
324        p_to_acceptors_p1a.batch(
325            acceptor_tick,
326            nondet!(
327                /// Non-deterministic batching may result in different payloads being rejected
328                /// by an acceptor if the payload is batched with another payload with larger ballot.
329                /// But as documented, payloads may be non-deterministically dropped during leader election.
330                nondet_acceptor_ballot
331            ),
332        ),
333        a_log,
334        proposers,
335    );
336
337    let (p_is_leader, p_accepted_values, fail_ballots) = p_p1b(
338        proposer_tick,
339        a_to_proposers_p1b.inspect(q!(|p1b| println!("Proposer received P1b: {:?}", p1b))),
340        p_ballot.clone(),
341        p_has_largest_ballot,
342        quorum_size,
343        num_quorum_participants,
344    );
345    p_is_leader_complete_cycle.complete(p_is_leader.clone());
346    p1b_fail_complete.complete(fail_ballots);
347
348    (p_ballot, p_is_leader, p_accepted_values, a_max_ballot)
349}
350
351// Proposer logic to calculate the next ballot number. Expects p_received_max_ballot, the largest ballot received so far. Outputs streams: ballot_num, and has_largest_ballot, which only contains a value if we have the largest ballot.
352#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
353fn p_ballot_calc<'a>(
354    p_received_max_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
355) -> (
356    Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
357    Singleton<bool, Tick<Cluster<'a, Proposer>>, Bounded>,
358) {
359    let tick = p_received_max_ballot.location().clone();
360    let (p_ballot, p_has_largest_ballot) = sliced! {
361        let p_received_max_ballot = use::atomic(p_received_max_ballot.latest_atomic(), nondet!(/** up to date with tick input */));
362        let mut p_ballot_num = use::state(|l| l.singleton(q!(0)));
363
364        p_ballot_num = p_received_max_ballot
365            .clone()
366            .zip(p_ballot_num)
367            .map(q!(move |(received_max_ballot, ballot_num)| {
368                if received_max_ballot
369                    > (Ballot {
370                        num: ballot_num,
371                        proposer_id: CLUSTER_SELF_ID.clone(),
372                    })
373                {
374                    received_max_ballot.num + 1
375                } else {
376                    ballot_num
377                }
378            }));
379
380        let p_ballot = p_ballot_num.clone().map(q!(move |num| Ballot {
381            num,
382            proposer_id: CLUSTER_SELF_ID.clone()
383        }));
384
385        let p_has_largest_ballot = p_received_max_ballot
386            .zip(p_ballot.clone())
387            .map(q!(
388                |(received_max_ballot, cur_ballot)| received_max_ballot <= cur_ballot
389            ));
390
391        (yield_atomic(p_ballot), yield_atomic(p_has_largest_ballot))
392    };
393
394    (
395        p_ballot.snapshot_atomic(
396            &tick,
397            nondet!(/** always up to date with received ballots */),
398        ),
399        p_has_largest_ballot.snapshot_atomic(
400            &tick,
401            nondet!(/** always up to date with received ballots */),
402        ),
403    )
404}
405
406#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
407fn p_leader_heartbeat<'a>(
408    proposers: &Cluster<'a, Proposer>,
409    proposer_tick: &Tick<Cluster<'a, Proposer>>,
410    p_is_leader: Singleton<bool, Tick<Cluster<'a, Proposer>>, Bounded>,
411    p_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
412    paxos_config: PaxosConfig,
413    nondet_reelection: NonDet,
414) -> (
415    Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder, AtLeastOnce>,
416    Singleton<bool, Tick<Cluster<'a, Proposer>>, Bounded>,
417) {
418    let i_am_leader_send_timeout = paxos_config.i_am_leader_send_timeout;
419    let i_am_leader_check_timeout = paxos_config.i_am_leader_check_timeout;
420    let i_am_leader_check_timeout_delay_multiplier =
421        paxos_config.i_am_leader_check_timeout_delay_multiplier;
422
423    let p_to_proposers_i_am_leader = p_ballot
424        .filter_if(p_is_leader.clone())
425        .latest()
426        .sample_every(
427            q!(Duration::from_secs(i_am_leader_send_timeout)),
428            nondet!(
429                /// Delays in heartbeats may lead to leader election attempts even
430                /// if the leader is alive. This will result in the previous leader receiving
431                /// larger ballots from its peers and it will drop its leadership.
432                nondet_reelection
433            ),
434        )
435        .broadcast(proposers, TCP.fail_stop().bincode(), nondet!(/** TODO */))
436        .values();
437
438    let p_leader_expired = p_to_proposers_i_am_leader
439        .clone()
440        .timeout(
441            q!(Duration::from_secs(i_am_leader_check_timeout)),
442            nondet!(
443                /// Delayed timeouts only affect which leader wins re-election. If the leadership flag
444                /// is gained after timeout correctly ignore the timeout. If the flag is lost after
445                /// timeout we correctly attempt to become the leader.
446                nondet_reelection
447            ),
448        )
449        .snapshot(proposer_tick, nondet!(/** absorbed into timeout */))
450        .filter_if(!p_is_leader);
451
452    // Add random delay depending on node ID so not everyone sends p1a at the same time
453    let p_trigger_election = p_leader_expired.is_some().and(
454        proposers
455            .source_interval_delayed(
456                q!(Duration::from_secs(
457                    (CLUSTER_SELF_ID.get_raw_id()
458                        * i_am_leader_check_timeout_delay_multiplier as u32)
459                        .into()
460                )),
461                q!(Duration::from_secs(i_am_leader_check_timeout)),
462                nondet!(
463                    /// If the leader 'un-expires' due to non-deterministic delay, we return
464                    /// to a stable leader state. If the leader remains expired, non-deterministic
465                    /// delay is propagated to the non-determinism of which leader is elected.
466                    nondet_reelection
467                ),
468            )
469            .batch(proposer_tick, nondet!(/** absorbed into interval */))
470            .first()
471            .is_some(),
472    );
473    (p_to_proposers_i_am_leader, p_trigger_election)
474}
475
476#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
477fn acceptor_p1<'a, L: Serialize + DeserializeOwned + Clone>(
478    acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
479    p_to_acceptors_p1a: Stream<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded, NoOrder>,
480    a_log: Singleton<(Option<usize>, L), Tick<Cluster<'a, Acceptor>>, Bounded>,
481    proposers: &Cluster<'a, Proposer>,
482) -> (
483    Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
484    Stream<(Ballot, Result<(Option<usize>, L), Ballot>), Cluster<'a, Proposer>, Unbounded, NoOrder>,
485) {
486    let a_max_ballot = p_to_acceptors_p1a
487        .clone()
488        .inspect(q!(|p1a| println!("Acceptor received P1a: {:?}", p1a)))
489        .across_ticks(|s| s.max())
490        .unwrap_or(acceptor_tick.singleton(q!(Ballot {
491            num: 0,
492            proposer_id: MemberId::from_raw_id(0)
493        })));
494
495    (
496        a_max_ballot.clone(),
497        p_to_acceptors_p1a
498            .cross_singleton(a_max_ballot)
499            .cross_singleton(a_log)
500            .map(q!(|((ballot, max_ballot), log)| (
501                ballot.proposer_id.clone(),
502                (
503                    ballot.clone(),
504                    if ballot == max_ballot {
505                        Ok(log)
506                    } else {
507                        Err(max_ballot)
508                    }
509                )
510            )))
511            .all_ticks()
512            .demux(proposers, TCP.fail_stop().bincode())
513            .values(),
514    )
515}
516
517// Proposer logic for processing p1bs, determining if the proposer is now the leader, which uncommitted messages to commit, what the maximum slot is in the p1bs, and which no-ops to commit to fill log holes.
518#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
519fn p_p1b<'a, P: Clone + Serialize + DeserializeOwned>(
520    proposer_tick: &Tick<Cluster<'a, Proposer>>,
521    a_to_proposers_p1b: Stream<
522        (Ballot, Result<(Option<usize>, P), Ballot>),
523        Cluster<'a, Proposer>,
524        Unbounded,
525        NoOrder,
526    >,
527    p_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
528    p_has_largest_ballot: Singleton<bool, Tick<Cluster<'a, Proposer>>, Bounded>,
529    quorum_size: usize,
530    num_quorum_participants: usize,
531) -> (
532    Singleton<bool, Tick<Cluster<'a, Proposer>>, Bounded>,
533    Stream<(Option<usize>, P), Tick<Cluster<'a, Proposer>>, Bounded, NoOrder>,
534    Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder>,
535) {
536    let (quorums, fails) =
537        collect_quorum_with_response(a_to_proposers_p1b, quorum_size, num_quorum_participants);
538
539    let p_received_quorum_of_p1bs = quorums
540        .into_keyed()
541        .assume_ordering::<TotalOrder>(nondet!(
542            /// We use `flatten_unordered` later
543        ))
544        .fold_early_stop(
545            q!(|| vec![]),
546            q!(move |logs, log| {
547                logs.push(log);
548                logs.len() >= quorum_size
549            }),
550        )
551        .get_max_key()
552        .snapshot(
553            proposer_tick,
554            nondet!(
555                /// If the max_by_key result is stale in this snapshot, we might be delayed in
556                /// realizing that we are the leader. This does not result in any safety issues.
557                /// In the meantime, ballots that acceptors rejected might be fed back into the max
558                /// ballot calculation, and change `p_ballot`. By using an async snapshot, we might
559                /// miss a chance to become the leader for a short period of time (until the rejected
560                /// ballots are processed). This is fine, if there is a higher ballot somewhere out
561                /// there, we should not be the leader anyways.
562            ),
563        )
564        .zip(p_ballot.clone())
565        .filter_map(q!(
566            move |((quorum_ballot, quorum_accepted), my_ballot)| if quorum_ballot == my_ballot {
567                Some(quorum_accepted)
568            } else {
569                None
570            }
571        ));
572
573    let p_is_leader = p_received_quorum_of_p1bs
574        .clone()
575        .is_some()
576        .and(p_has_largest_ballot);
577
578    (
579        p_is_leader,
580        // we used an unordered accumulator, so flattened has no order
581        p_received_quorum_of_p1bs.flatten_unordered(),
582        fails.map(q!(|(_, ballot)| ballot)),
583    )
584}
585
586#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
587pub fn recommit_after_leader_election<'a, P: PaxosPayload>(
588    accepted_logs: Stream<
589        (Option<usize>, HashMap<usize, LogValue<P>>),
590        Tick<Cluster<'a, Proposer>>,
591        Bounded,
592        NoOrder,
593    >,
594    p_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
595    f: usize,
596) -> (
597    Stream<((usize, Ballot), Option<P>), Tick<Cluster<'a, Proposer>>, Bounded, NoOrder>,
598    Optional<usize, Tick<Cluster<'a, Proposer>>, Bounded>,
599) {
600    let p_p1b_max_checkpoint = accepted_logs
601        .clone()
602        .filter_map(q!(|(checkpoint, _log)| checkpoint))
603        .max()
604        .into_singleton();
605    let p_p1b_highest_entries_and_count = accepted_logs
606        .map(q!(|(_checkpoint, log)| log))
607        .flatten_unordered() // Convert HashMap log back to stream
608        .into_keyed()
609        .fold::<(usize, Option<LogValue<P>>), _, _, _, _, _, _>(q!(|| (0, None)), q!(|curr_entry, new_entry| {
610            if let Some(curr_entry_payload) = &mut curr_entry.1 {
611                let same_values = new_entry.value == curr_entry_payload.value;
612                let higher_ballot = new_entry.ballot > curr_entry_payload.ballot;
613                // Increment count if the values are the same
614                if same_values {
615                    curr_entry.0 += 1;
616                }
617                // Replace the ballot with the largest one
618                if higher_ballot {
619                    curr_entry_payload.ballot = new_entry.ballot;
620                    // Replace the value with the one from the largest ballot, if necessary
621                    if !same_values {
622                        curr_entry.0 = 1;
623                        curr_entry_payload.value = new_entry.value;
624                    }
625                }
626            } else {
627                *curr_entry = (1, Some(new_entry));
628            }
629        }, commutative = manual_proof!(/** TODO */)))
630        .map(q!(|(count, entry)| (count, entry.unwrap())));
631    let p_log_to_try_commit = p_p1b_highest_entries_and_count
632        .clone()
633        .entries()
634        .cross_singleton(p_ballot.clone())
635        .cross_singleton(p_p1b_max_checkpoint.clone())
636        .filter_map(q!(move |(((slot, (count, entry)), ballot), checkpoint)| {
637            if count > f {
638                return None;
639            } else if let Some(checkpoint) = checkpoint
640                && slot <= checkpoint
641            {
642                return None;
643            }
644            Some(((slot, ballot), entry.value))
645        }));
646    let p_max_slot = p_p1b_highest_entries_and_count.clone().keys().max();
647    let p_proposed_slots = p_p1b_highest_entries_and_count.clone().keys();
648    let p_log_holes = p_max_slot
649        .clone()
650        .zip(p_p1b_max_checkpoint)
651        .flat_map_ordered(q!(|(max_slot, checkpoint)| {
652            if let Some(checkpoint) = checkpoint {
653                (checkpoint + 1)..max_slot
654            } else {
655                0..max_slot
656            }
657        }))
658        .filter_not_in(p_proposed_slots)
659        .cross_singleton(p_ballot.clone())
660        .map(q!(move |(slot, ballot)| ((slot, ballot), None)));
661
662    (p_log_to_try_commit.chain(p_log_holes), p_max_slot)
663}
664
665#[expect(
666    clippy::type_complexity,
667    clippy::too_many_arguments,
668    reason = "internal paxos code // TODO"
669)]
670fn sequence_payload<'a, P: PaxosPayload>(
671    proposers: &Cluster<'a, Proposer>,
672    acceptors: &Cluster<'a, Acceptor>,
673    proposer_tick: &Tick<Cluster<'a, Proposer>>,
674    acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
675    c_to_proposers: Stream<P, Cluster<'a, Proposer>, Unbounded>,
676    a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
677
678    p_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
679    p_is_leader: Singleton<bool, Tick<Cluster<'a, Proposer>>, Bounded>,
680
681    p_relevant_p1bs: Stream<
682        (Option<usize>, HashMap<usize, LogValue<P>>),
683        Tick<Cluster<'a, Proposer>>,
684        Bounded,
685        NoOrder,
686    >,
687    f: usize,
688
689    a_max_ballot: Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
690
691    nondet_commit: NonDet,
692) -> (
693    Stream<(usize, Option<P>), Cluster<'a, Proposer>, Unbounded, NoOrder>,
694    Singleton<
695        (Option<usize>, HashMap<usize, LogValue<P>>),
696        Atomic<Cluster<'a, Acceptor>>,
697        Unbounded,
698    >,
699    Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder>,
700) {
701    let (p_log_to_recommit, p_max_slot) =
702        recommit_after_leader_election(p_relevant_p1bs, p_ballot.clone(), f);
703
704    let indexed_payloads = index_payloads(
705        p_max_slot,
706        c_to_proposers
707            .batch(
708                proposer_tick,
709                nondet!(
710                    /// We batch payloads so that we can compute the correct slot based on
711                    /// base slot. In the case of a leader re-election, the base slot is updated which
712                    /// affects the computed payload slots. This non-determinism can lead to non-determinism
713                    /// in which payloads are committed when the leader is changing, which is documented at
714                    /// the function level
715                    nondet_commit
716                ),
717            )
718            .filter_if(p_is_leader.clone()),
719    );
720
721    let payloads_to_send = indexed_payloads
722        .cross_singleton(p_ballot.clone())
723        .map(q!(|((slot, payload), ballot)| (
724            (slot, ballot),
725            Some(payload)
726        )))
727        .chain(p_log_to_recommit)
728        .filter_if(p_is_leader)
729        .all_ticks_atomic();
730
731    let (a_log, a_to_proposers_p2b) = acceptor_p2(
732        acceptor_tick,
733        a_max_ballot.clone(),
734        payloads_to_send
735            .clone()
736            .end_atomic()
737            .map(q!(move |((slot, ballot), value)| P2a {
738                sender: CLUSTER_SELF_ID.clone(),
739                ballot,
740                slot,
741                value
742            }))
743            .broadcast(acceptors, TCP.fail_stop().bincode(), nondet!(/** TODO */))
744            .values(),
745        a_checkpoint,
746        proposers,
747    );
748
749    // TOOD: only persist if we are the leader
750    let (quorums, fails) = collect_quorum(a_to_proposers_p2b, f + 1, 2 * f + 1);
751
752    let p_to_replicas = join_responses(
753        quorums.map(q!(|k| (k, ()))),
754        payloads_to_send.batch_atomic(
755            proposer_tick,
756            nondet!(
757                /// The metadata will always be generated before we get a quorum
758                /// because our batch of `payloads_to_send` is at least after
759                /// what we sent to the acceptors.
760            ),
761        ),
762    );
763
764    (
765        p_to_replicas.map(q!(|((slot, _ballot), (value, _))| (slot, value))),
766        a_log,
767        fails.map(q!(|(_, ballot)| ballot)),
768    )
769}
770
771// Proposer logic to send p2as, outputting the next slot and the p2as to send to acceptors.
772pub fn index_payloads<'a, L: Location<'a> + NoTick, P: PaxosPayload>(
773    p_max_slot: Optional<usize, Tick<L>, Bounded>,
774    c_to_proposers: Stream<P, Tick<L>, Bounded>,
775) -> Stream<(usize, P), Tick<L>, Bounded> {
776    let tick = c_to_proposers.location().clone();
777    let sliced_result = sliced! {
778        let mut next_slot = use::state(|l| l.singleton(q!(0)));
779        let updated_max_slot = use::atomic(p_max_slot.latest_atomic(), nondet!(/** up to date with tick input */));
780        let payload_batch = use::atomic(c_to_proposers.all_ticks_atomic(), nondet!(/** up to date with tick input */));
781
782        let next_slot_after_reconciling_p1bs = updated_max_slot.map(q!(|s| s + 1));
783        let base_slot = next_slot_after_reconciling_p1bs.unwrap_or(next_slot);
784
785        let indexed_payloads = payload_batch
786            .enumerate()
787            .cross_singleton(base_slot.clone())
788            .map(q!(|((index, payload), base_slot)| (
789                base_slot + index,
790                payload
791            )));
792
793        let num_payloads = indexed_payloads.clone().count();
794        next_slot = num_payloads
795            .zip(base_slot)
796            .map(q!(|(num_payloads, base_slot)| base_slot + num_payloads));
797
798        yield_atomic(indexed_payloads)
799    };
800    sliced_result.batch_atomic(&tick, nondet!(/** up to date with tick input */))
801}
802
803#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
804pub fn acceptor_p2<'a, P: PaxosPayload, S: Clone>(
805    acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
806    a_max_ballot: Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
807    p_to_acceptors_p2a: Stream<P2a<P, S>, Cluster<'a, Acceptor>, Unbounded, NoOrder>,
808    a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
809    proposers: &Cluster<'a, S>,
810) -> (
811    Singleton<
812        (Option<usize>, HashMap<usize, LogValue<P>>),
813        Atomic<Cluster<'a, Acceptor>>,
814        Unbounded,
815    >,
816    Stream<((usize, Ballot), Result<(), Ballot>), Cluster<'a, S>, Unbounded, NoOrder>,
817) {
818    let p_to_acceptors_p2a_batch = p_to_acceptors_p2a.batch(
819        acceptor_tick,
820        nondet!(
821            /// We use batches to ensure that the log is updated before sending
822            /// a confirmation to the proposer. Because we use `persist()` on these
823            /// messages before folding into the log, non-deterministic batch boundaries
824            /// will not affect the eventual log state.
825        ),
826    );
827
828    let a_checkpoint = a_checkpoint.snapshot(
829        acceptor_tick,
830        nondet!(
831            /// We can arbitrarily snapshot the checkpoint sequence number,
832            /// since a delayed garbage collection does not affect correctness.
833        ),
834    );
835    // .inspect(q!(|min_seq| println!("Acceptor new checkpoint: {:?}", min_seq)));
836
837    let a_p2as_to_place_in_log = p_to_acceptors_p2a_batch
838        .clone()
839        .cross_singleton(a_max_ballot.clone()) // Don't consider p2as if the current ballot is higher
840        .filter_map(q!(|(p2a, max_ballot)|
841            if p2a.ballot >= max_ballot {
842                Some((p2a.slot, LogValue {
843                    ballot: p2a.ballot,
844                    value: p2a.value,
845                }))
846            } else {
847                None
848            }
849        ));
850    let a_log = a_p2as_to_place_in_log.across_ticks(|s| {
851        s.into_keyed().reduce_watermark(
852            a_checkpoint.clone(),
853            q!(|prev_entry, entry| {
854                // Insert p2a into the log if it has a higher ballot than what was there before
855                if entry.ballot > prev_entry.ballot {
856                    *prev_entry = LogValue {
857                        ballot: entry.ballot,
858                        value: entry.value,
859                    };
860                }
861            }, commutative = manual_proof!(/** max by ballot (TODO: not if two entries with same ballot, need assume) */)),
862        )
863    });
864    let a_log_snapshot = a_log.entries().fold(
865        q!(|| HashMap::new()),
866        q!(
867            |map, (slot, entry)| {
868                map.insert(slot, entry);
869            },
870            commutative = manual_proof!(/** inserting elements into map is commutative because no keys will overlap */)
871        ),
872    );
873
874    let a_to_proposers_p2b = p_to_acceptors_p2a_batch
875        .cross_singleton(a_max_ballot)
876        .map(q!(|(p2a, max_ballot)| (
877            p2a.sender,
878            (
879                (p2a.slot, p2a.ballot.clone()),
880                if p2a.ballot == max_ballot {
881                    Ok(())
882                } else {
883                    Err(max_ballot)
884                }
885            )
886        )))
887        .all_ticks()
888        .demux(proposers, TCP.fail_stop().bincode())
889        .values();
890
891    (
892        a_checkpoint
893            .into_singleton()
894            .zip(a_log_snapshot)
895            .latest_atomic(),
896        a_to_proposers_p2b,
897    )
898}
899
900#[cfg(test)]
901mod tests {
902    use hydro_lang::prelude::*;
903
904    use super::index_payloads;
905
906    #[test]
907    fn proposer_indexes_payloads() {
908        let mut flow = FlowBuilder::new();
909        let node = flow.process::<()>();
910        let tick = node.tick();
911
912        let (in_send, input_payloads) = node.sim_input();
913        let indexed = index_payloads(
914            tick.none(),
915            input_payloads.batch(&tick, nondet!(/** test */)),
916        );
917
918        let out_recv = indexed.all_ticks().sim_output();
919
920        flow.sim().exhaustive(async || {
921            in_send.send(1);
922            in_send.send(2);
923            in_send.send(3);
924            in_send.send(4);
925
926            out_recv
927                .assert_yields_only([(0, 1), (1, 2), (2, 3), (3, 4)])
928                .await;
929        });
930    }
931
932    #[test]
933    fn proposer_indexes_payloads_jumps_on_new_max() {
934        let mut flow = FlowBuilder::new();
935        let node = flow.process::<()>();
936        let tick = node.tick();
937
938        let (in_send, input_payloads) = node.sim_input();
939        let release_new_base = node
940            .source_iter(q!([123]))
941            .batch(&tick, nondet!(/** test */))
942            .first();
943        let indexed = index_payloads(
944            release_new_base,
945            input_payloads.batch(&tick, nondet!(/** test */)),
946        );
947
948        let out_recv = indexed.all_ticks().sim_output();
949
950        flow.sim().exhaustive(async || {
951            in_send.send(1);
952            in_send.send(2);
953            in_send.send(3);
954            in_send.send(4);
955
956            let mut next_expected = 0;
957            for i in 1..=4 {
958                let (next_slot, v) = out_recv.next().await.unwrap();
959                assert_eq!(v, i);
960
961                if next_expected < 123 {
962                    assert!(next_slot == next_expected || next_slot == 124);
963                } else {
964                    assert!(next_slot == next_expected);
965                }
966
967                next_expected = next_slot + 1;
968            }
969        });
970    }
971}