1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::hash::Hash;
4use std::time::Duration;
5
6use hydro_lang::live_collections::sliced::yield_atomic;
7use hydro_lang::live_collections::stream::{AtLeastOnce, NoOrder, TotalOrder};
8use hydro_lang::location::cluster::CLUSTER_SELF_ID;
9use hydro_lang::location::{Atomic, Location, MemberId, NoTick};
10use hydro_lang::prelude::*;
11use hydro_std::quorum::{collect_quorum, collect_quorum_with_response};
12use hydro_std::request_response::join_responses;
13use serde::de::DeserializeOwned;
14use serde::{Deserialize, Serialize};
15
16use super::paxos_with_client::PaxosLike;
17
18#[derive(Serialize, Deserialize, Clone)]
19pub struct Proposer {}
20pub struct Acceptor {}
21
22#[derive(Clone, Copy)]
23pub struct PaxosConfig {
24 pub f: usize,
26 pub i_am_leader_send_timeout: u64,
28 pub i_am_leader_check_timeout: u64,
30 pub i_am_leader_check_timeout_delay_multiplier: usize,
32}
33
34pub trait PaxosPayload: Serialize + DeserializeOwned + PartialEq + Eq + Clone + Debug {}
35impl<T: Serialize + DeserializeOwned + PartialEq + Eq + Clone + Debug> PaxosPayload for T {}
36
37#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug, Hash)]
38pub struct Ballot {
39 pub num: u32,
40 pub proposer_id: MemberId<Proposer>,
41}
42
43impl Ord for Ballot {
44 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
45 self.num
46 .cmp(&other.num)
47 .then_with(|| self.proposer_id.cmp(&other.proposer_id))
48 }
49}
50
51impl PartialOrd for Ballot {
52 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
53 Some(self.cmp(other))
54 }
55}
56
57#[derive(Serialize, Deserialize, Clone, Debug)]
58pub struct LogValue<P> {
59 pub ballot: Ballot,
60 pub value: Option<P>, }
62
63#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
64pub struct P2a<P, S> {
65 pub sender: MemberId<S>,
66 pub ballot: Ballot,
67 pub slot: usize,
68 pub value: Option<P>, }
70
71pub struct CorePaxos<'a> {
72 pub proposers: Cluster<'a, Proposer>,
73 pub acceptors: Cluster<'a, Acceptor>,
74 pub paxos_config: PaxosConfig,
75}
76
77impl<'a> PaxosLike<'a> for CorePaxos<'a> {
78 type PaxosIn = Proposer;
79 type PaxosLog = Acceptor;
80 type PaxosOut = Proposer;
81 type Ballot = Ballot;
82
83 fn payload_recipients(&self) -> &Cluster<'a, Self::PaxosIn> {
84 &self.proposers
85 }
86
87 fn log_stores(&self) -> &Cluster<'a, Self::PaxosLog> {
88 &self.acceptors
89 }
90
91 fn get_recipient_from_ballot<L: Location<'a>>(
92 ballot: Optional<Self::Ballot, L, Unbounded>,
93 ) -> Optional<MemberId<Self::PaxosIn>, L, Unbounded> {
94 ballot.map(q!(|ballot| ballot.proposer_id))
95 }
96
97 fn build<P: PaxosPayload>(
98 self,
99 with_ballot: impl FnOnce(
100 Stream<Ballot, Cluster<'a, Self::PaxosIn>, Unbounded>,
101 ) -> Stream<P, Cluster<'a, Self::PaxosIn>, Unbounded>,
102 a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
103 nondet_leader: NonDet,
104 nondet_commit: NonDet,
105 ) -> Stream<(usize, Option<P>), Cluster<'a, Self::PaxosOut>, Unbounded, NoOrder> {
106 paxos_core(
107 &self.proposers,
108 &self.acceptors,
109 a_checkpoint,
110 with_ballot,
111 self.paxos_config,
112 nondet_leader,
113 nondet_commit,
114 )
115 .1
116 }
117}
118
119#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
136pub fn paxos_core<'a, P: PaxosPayload>(
137 proposers: &Cluster<'a, Proposer>,
138 acceptors: &Cluster<'a, Acceptor>,
139 a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
140 c_to_proposers: impl FnOnce(
141 Stream<Ballot, Cluster<'a, Proposer>, Unbounded>,
142 ) -> Stream<P, Cluster<'a, Proposer>, Unbounded>,
143 config: PaxosConfig,
144 nondet_leader: NonDet,
145 nondet_commit: NonDet,
146) -> (
147 Stream<Ballot, Cluster<'a, Proposer>, Unbounded>,
148 Stream<(usize, Option<P>), Cluster<'a, Proposer>, Unbounded, NoOrder>,
149) {
150 let f = config.f;
151
152 proposers
153 .source_iter(q!(["Proposers say hello"]))
154 .for_each(q!(|s| println!("{}", s)));
155
156 acceptors
157 .source_iter(q!(["Acceptors say hello"]))
158 .for_each(q!(|s| println!("{}", s)));
159
160 let proposer_tick = proposers.tick();
161 let acceptor_tick = acceptors.tick();
162
163 let (sequencing_max_ballot_complete_cycle, sequencing_max_ballot_forward_reference) =
164 proposers.forward_ref::<Stream<Ballot, _, _, NoOrder>>();
165 let (a_log_complete_cycle, a_log_forward_reference) =
166 acceptor_tick.forward_ref::<Singleton<_, _, _>>();
167
168 let (p_ballot, p_is_leader, p_relevant_p1bs, a_max_ballot) = leader_election(
169 proposers,
170 acceptors,
171 &proposer_tick,
172 &acceptor_tick,
173 f + 1,
174 2 * f + 1,
175 config,
176 sequencing_max_ballot_forward_reference,
177 a_log_forward_reference,
178 nondet!(
179 nondet_leader
183 ),
184 nondet!(
185 nondet_commit
188 ),
189 );
190
191 let was_not_leader = p_is_leader
192 .clone()
193 .map(q!(|is_leader| is_leader.then_some(())))
194 .into_optional()
195 .defer_tick()
196 .is_none();
197 let just_became_leader = p_is_leader.clone().and(was_not_leader);
198
199 let c_to_proposers = c_to_proposers(
200 p_ballot
201 .clone()
202 .filter_if(just_became_leader.clone())
203 .all_ticks(),
204 );
205
206 let (p_to_replicas, a_log, sequencing_max_ballots) = sequence_payload(
207 proposers,
208 acceptors,
209 &proposer_tick,
210 &acceptor_tick,
211 c_to_proposers,
212 a_checkpoint,
213 p_ballot.clone(),
214 p_is_leader,
215 p_relevant_p1bs,
216 f,
217 a_max_ballot,
218 nondet!(
219 nondet_commit
224 ),
225 );
226
227 a_log_complete_cycle.complete(a_log.snapshot_atomic(
228 &acceptor_tick,
229 nondet!(
230 ),
234 ));
235 sequencing_max_ballot_complete_cycle.complete(sequencing_max_ballots);
236
237 (
238 p_ballot.filter_if(just_became_leader).all_ticks(),
240 p_to_replicas,
241 )
242}
243
244#[expect(
245 clippy::type_complexity,
246 clippy::too_many_arguments,
247 reason = "internal paxos code // TODO"
248)]
249pub fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>(
250 proposers: &Cluster<'a, Proposer>,
251 acceptors: &Cluster<'a, Acceptor>,
252 proposer_tick: &Tick<Cluster<'a, Proposer>>,
253 acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
254 quorum_size: usize,
255 num_quorum_participants: usize,
256 paxos_config: PaxosConfig,
257 p_received_p2b_ballots: Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder>,
258 a_log: Singleton<(Option<usize>, L), Tick<Cluster<'a, Acceptor>>, Bounded>,
259 nondet_leader: NonDet,
260 nondet_acceptor_ballot: NonDet,
261) -> (
262 Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
263 Singleton<bool, Tick<Cluster<'a, Proposer>>, Bounded>,
264 Stream<(Option<usize>, L), Tick<Cluster<'a, Proposer>>, Bounded, NoOrder>,
265 Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
266) {
267 let (p1b_fail_complete, p1b_fail) =
268 proposers.forward_ref::<Stream<Ballot, _, Unbounded, NoOrder>>();
269 let (p_to_proposers_i_am_leader_complete_cycle, p_to_proposers_i_am_leader_forward_ref) =
270 proposers.forward_ref::<Stream<_, _, _, NoOrder, AtLeastOnce>>();
271 let (p_is_leader_complete_cycle, p_is_leader_forward_ref) =
272 proposer_tick.forward_ref::<Singleton<bool, _, _>>();
273 let p_received_max_ballot = p1b_fail
278 .merge_unordered(p_received_p2b_ballots)
279 .merge_unordered(p_to_proposers_i_am_leader_forward_ref)
280 .max()
281 .unwrap_or(
282 proposers
283 .singleton(q!(Ballot {
284 num: 0,
285 proposer_id: MemberId::from_raw_id(0)
286 }))
287 .into(),
288 );
289
290 let (p_ballot, p_has_largest_ballot) = p_ballot_calc(p_received_max_ballot.snapshot(
291 proposer_tick,
292 nondet!(
293 nondet_leader
296 ),
297 ));
298
299 let (p_to_proposers_i_am_leader, p_trigger_election) = p_leader_heartbeat(
300 proposers,
301 proposer_tick,
302 p_is_leader_forward_ref,
303 p_ballot.clone(),
304 paxos_config,
305 nondet!(
306 nondet_leader
309 ),
310 );
311
312 p_to_proposers_i_am_leader_complete_cycle.complete(p_to_proposers_i_am_leader);
313
314 let p_to_acceptors_p1a = p_ballot
315 .clone()
316 .filter_if(p_trigger_election)
317 .all_ticks()
318 .inspect(q!(|_| println!("Proposer leader expired, sending P1a")))
319 .broadcast(acceptors, TCP.fail_stop().bincode(), nondet!())
320 .values();
321
322 let (a_max_ballot, a_to_proposers_p1b) = acceptor_p1(
323 acceptor_tick,
324 p_to_acceptors_p1a.batch(
325 acceptor_tick,
326 nondet!(
327 nondet_acceptor_ballot
331 ),
332 ),
333 a_log,
334 proposers,
335 );
336
337 let (p_is_leader, p_accepted_values, fail_ballots) = p_p1b(
338 proposer_tick,
339 a_to_proposers_p1b.inspect(q!(|p1b| println!("Proposer received P1b: {:?}", p1b))),
340 p_ballot.clone(),
341 p_has_largest_ballot,
342 quorum_size,
343 num_quorum_participants,
344 );
345 p_is_leader_complete_cycle.complete(p_is_leader.clone());
346 p1b_fail_complete.complete(fail_ballots);
347
348 (p_ballot, p_is_leader, p_accepted_values, a_max_ballot)
349}
350
351#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
353fn p_ballot_calc<'a>(
354 p_received_max_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
355) -> (
356 Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
357 Singleton<bool, Tick<Cluster<'a, Proposer>>, Bounded>,
358) {
359 let tick = p_received_max_ballot.location().clone();
360 let (p_ballot, p_has_largest_ballot) = sliced! {
361 let p_received_max_ballot = use::atomic(p_received_max_ballot.latest_atomic(), nondet!());
362 let mut p_ballot_num = use::state(|l| l.singleton(q!(0)));
363
364 p_ballot_num = p_received_max_ballot
365 .clone()
366 .zip(p_ballot_num)
367 .map(q!(move |(received_max_ballot, ballot_num)| {
368 if received_max_ballot
369 > (Ballot {
370 num: ballot_num,
371 proposer_id: CLUSTER_SELF_ID.clone(),
372 })
373 {
374 received_max_ballot.num + 1
375 } else {
376 ballot_num
377 }
378 }));
379
380 let p_ballot = p_ballot_num.clone().map(q!(move |num| Ballot {
381 num,
382 proposer_id: CLUSTER_SELF_ID.clone()
383 }));
384
385 let p_has_largest_ballot = p_received_max_ballot
386 .zip(p_ballot.clone())
387 .map(q!(
388 |(received_max_ballot, cur_ballot)| received_max_ballot <= cur_ballot
389 ));
390
391 (yield_atomic(p_ballot), yield_atomic(p_has_largest_ballot))
392 };
393
394 (
395 p_ballot.snapshot_atomic(
396 &tick,
397 nondet!(),
398 ),
399 p_has_largest_ballot.snapshot_atomic(
400 &tick,
401 nondet!(),
402 ),
403 )
404}
405
406#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
407fn p_leader_heartbeat<'a>(
408 proposers: &Cluster<'a, Proposer>,
409 proposer_tick: &Tick<Cluster<'a, Proposer>>,
410 p_is_leader: Singleton<bool, Tick<Cluster<'a, Proposer>>, Bounded>,
411 p_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
412 paxos_config: PaxosConfig,
413 nondet_reelection: NonDet,
414) -> (
415 Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder, AtLeastOnce>,
416 Singleton<bool, Tick<Cluster<'a, Proposer>>, Bounded>,
417) {
418 let i_am_leader_send_timeout = paxos_config.i_am_leader_send_timeout;
419 let i_am_leader_check_timeout = paxos_config.i_am_leader_check_timeout;
420 let i_am_leader_check_timeout_delay_multiplier =
421 paxos_config.i_am_leader_check_timeout_delay_multiplier;
422
423 let p_to_proposers_i_am_leader = p_ballot
424 .filter_if(p_is_leader.clone())
425 .latest()
426 .sample_every(
427 q!(Duration::from_secs(i_am_leader_send_timeout)),
428 nondet!(
429 nondet_reelection
433 ),
434 )
435 .broadcast(proposers, TCP.fail_stop().bincode(), nondet!())
436 .values();
437
438 let p_leader_expired = p_to_proposers_i_am_leader
439 .clone()
440 .timeout(
441 q!(Duration::from_secs(i_am_leader_check_timeout)),
442 nondet!(
443 nondet_reelection
447 ),
448 )
449 .snapshot(proposer_tick, nondet!())
450 .filter_if(!p_is_leader);
451
452 let p_trigger_election = p_leader_expired.is_some().and(
454 proposers
455 .source_interval_delayed(
456 q!(Duration::from_secs(
457 (CLUSTER_SELF_ID.get_raw_id()
458 * i_am_leader_check_timeout_delay_multiplier as u32)
459 .into()
460 )),
461 q!(Duration::from_secs(i_am_leader_check_timeout)),
462 nondet!(
463 nondet_reelection
467 ),
468 )
469 .batch(proposer_tick, nondet!())
470 .first()
471 .is_some(),
472 );
473 (p_to_proposers_i_am_leader, p_trigger_election)
474}
475
476#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
477fn acceptor_p1<'a, L: Serialize + DeserializeOwned + Clone>(
478 acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
479 p_to_acceptors_p1a: Stream<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded, NoOrder>,
480 a_log: Singleton<(Option<usize>, L), Tick<Cluster<'a, Acceptor>>, Bounded>,
481 proposers: &Cluster<'a, Proposer>,
482) -> (
483 Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
484 Stream<(Ballot, Result<(Option<usize>, L), Ballot>), Cluster<'a, Proposer>, Unbounded, NoOrder>,
485) {
486 let a_max_ballot = p_to_acceptors_p1a
487 .clone()
488 .inspect(q!(|p1a| println!("Acceptor received P1a: {:?}", p1a)))
489 .across_ticks(|s| s.max())
490 .unwrap_or(acceptor_tick.singleton(q!(Ballot {
491 num: 0,
492 proposer_id: MemberId::from_raw_id(0)
493 })));
494
495 (
496 a_max_ballot.clone(),
497 p_to_acceptors_p1a
498 .cross_singleton(a_max_ballot)
499 .cross_singleton(a_log)
500 .map(q!(|((ballot, max_ballot), log)| (
501 ballot.proposer_id.clone(),
502 (
503 ballot.clone(),
504 if ballot == max_ballot {
505 Ok(log)
506 } else {
507 Err(max_ballot)
508 }
509 )
510 )))
511 .all_ticks()
512 .demux(proposers, TCP.fail_stop().bincode())
513 .values(),
514 )
515}
516
517#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
519fn p_p1b<'a, P: Clone + Serialize + DeserializeOwned>(
520 proposer_tick: &Tick<Cluster<'a, Proposer>>,
521 a_to_proposers_p1b: Stream<
522 (Ballot, Result<(Option<usize>, P), Ballot>),
523 Cluster<'a, Proposer>,
524 Unbounded,
525 NoOrder,
526 >,
527 p_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
528 p_has_largest_ballot: Singleton<bool, Tick<Cluster<'a, Proposer>>, Bounded>,
529 quorum_size: usize,
530 num_quorum_participants: usize,
531) -> (
532 Singleton<bool, Tick<Cluster<'a, Proposer>>, Bounded>,
533 Stream<(Option<usize>, P), Tick<Cluster<'a, Proposer>>, Bounded, NoOrder>,
534 Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder>,
535) {
536 let (quorums, fails) =
537 collect_quorum_with_response(a_to_proposers_p1b, quorum_size, num_quorum_participants);
538
539 let p_received_quorum_of_p1bs = quorums
540 .into_keyed()
541 .assume_ordering::<TotalOrder>(nondet!(
542 ))
544 .fold_early_stop(
545 q!(|| vec![]),
546 q!(move |logs, log| {
547 logs.push(log);
548 logs.len() >= quorum_size
549 }),
550 )
551 .get_max_key()
552 .snapshot(
553 proposer_tick,
554 nondet!(
555 ),
563 )
564 .zip(p_ballot.clone())
565 .filter_map(q!(
566 move |((quorum_ballot, quorum_accepted), my_ballot)| if quorum_ballot == my_ballot {
567 Some(quorum_accepted)
568 } else {
569 None
570 }
571 ));
572
573 let p_is_leader = p_received_quorum_of_p1bs
574 .clone()
575 .is_some()
576 .and(p_has_largest_ballot);
577
578 (
579 p_is_leader,
580 p_received_quorum_of_p1bs.flatten_unordered(),
582 fails.map(q!(|(_, ballot)| ballot)),
583 )
584}
585
586#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
587pub fn recommit_after_leader_election<'a, P: PaxosPayload>(
588 accepted_logs: Stream<
589 (Option<usize>, HashMap<usize, LogValue<P>>),
590 Tick<Cluster<'a, Proposer>>,
591 Bounded,
592 NoOrder,
593 >,
594 p_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
595 f: usize,
596) -> (
597 Stream<((usize, Ballot), Option<P>), Tick<Cluster<'a, Proposer>>, Bounded, NoOrder>,
598 Optional<usize, Tick<Cluster<'a, Proposer>>, Bounded>,
599) {
600 let p_p1b_max_checkpoint = accepted_logs
601 .clone()
602 .filter_map(q!(|(checkpoint, _log)| checkpoint))
603 .max()
604 .into_singleton();
605 let p_p1b_highest_entries_and_count = accepted_logs
606 .map(q!(|(_checkpoint, log)| log))
607 .flatten_unordered() .into_keyed()
609 .fold::<(usize, Option<LogValue<P>>), _, _, _, _, _, _>(q!(|| (0, None)), q!(|curr_entry, new_entry| {
610 if let Some(curr_entry_payload) = &mut curr_entry.1 {
611 let same_values = new_entry.value == curr_entry_payload.value;
612 let higher_ballot = new_entry.ballot > curr_entry_payload.ballot;
613 if same_values {
615 curr_entry.0 += 1;
616 }
617 if higher_ballot {
619 curr_entry_payload.ballot = new_entry.ballot;
620 if !same_values {
622 curr_entry.0 = 1;
623 curr_entry_payload.value = new_entry.value;
624 }
625 }
626 } else {
627 *curr_entry = (1, Some(new_entry));
628 }
629 }, commutative = manual_proof!()))
630 .map(q!(|(count, entry)| (count, entry.unwrap())));
631 let p_log_to_try_commit = p_p1b_highest_entries_and_count
632 .clone()
633 .entries()
634 .cross_singleton(p_ballot.clone())
635 .cross_singleton(p_p1b_max_checkpoint.clone())
636 .filter_map(q!(move |(((slot, (count, entry)), ballot), checkpoint)| {
637 if count > f {
638 return None;
639 } else if let Some(checkpoint) = checkpoint
640 && slot <= checkpoint
641 {
642 return None;
643 }
644 Some(((slot, ballot), entry.value))
645 }));
646 let p_max_slot = p_p1b_highest_entries_and_count.clone().keys().max();
647 let p_proposed_slots = p_p1b_highest_entries_and_count.clone().keys();
648 let p_log_holes = p_max_slot
649 .clone()
650 .zip(p_p1b_max_checkpoint)
651 .flat_map_ordered(q!(|(max_slot, checkpoint)| {
652 if let Some(checkpoint) = checkpoint {
653 (checkpoint + 1)..max_slot
654 } else {
655 0..max_slot
656 }
657 }))
658 .filter_not_in(p_proposed_slots)
659 .cross_singleton(p_ballot.clone())
660 .map(q!(move |(slot, ballot)| ((slot, ballot), None)));
661
662 (p_log_to_try_commit.chain(p_log_holes), p_max_slot)
663}
664
665#[expect(
666 clippy::type_complexity,
667 clippy::too_many_arguments,
668 reason = "internal paxos code // TODO"
669)]
670fn sequence_payload<'a, P: PaxosPayload>(
671 proposers: &Cluster<'a, Proposer>,
672 acceptors: &Cluster<'a, Acceptor>,
673 proposer_tick: &Tick<Cluster<'a, Proposer>>,
674 acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
675 c_to_proposers: Stream<P, Cluster<'a, Proposer>, Unbounded>,
676 a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
677
678 p_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
679 p_is_leader: Singleton<bool, Tick<Cluster<'a, Proposer>>, Bounded>,
680
681 p_relevant_p1bs: Stream<
682 (Option<usize>, HashMap<usize, LogValue<P>>),
683 Tick<Cluster<'a, Proposer>>,
684 Bounded,
685 NoOrder,
686 >,
687 f: usize,
688
689 a_max_ballot: Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
690
691 nondet_commit: NonDet,
692) -> (
693 Stream<(usize, Option<P>), Cluster<'a, Proposer>, Unbounded, NoOrder>,
694 Singleton<
695 (Option<usize>, HashMap<usize, LogValue<P>>),
696 Atomic<Cluster<'a, Acceptor>>,
697 Unbounded,
698 >,
699 Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder>,
700) {
701 let (p_log_to_recommit, p_max_slot) =
702 recommit_after_leader_election(p_relevant_p1bs, p_ballot.clone(), f);
703
704 let indexed_payloads = index_payloads(
705 p_max_slot,
706 c_to_proposers
707 .batch(
708 proposer_tick,
709 nondet!(
710 nondet_commit
716 ),
717 )
718 .filter_if(p_is_leader.clone()),
719 );
720
721 let payloads_to_send = indexed_payloads
722 .cross_singleton(p_ballot.clone())
723 .map(q!(|((slot, payload), ballot)| (
724 (slot, ballot),
725 Some(payload)
726 )))
727 .chain(p_log_to_recommit)
728 .filter_if(p_is_leader)
729 .all_ticks_atomic();
730
731 let (a_log, a_to_proposers_p2b) = acceptor_p2(
732 acceptor_tick,
733 a_max_ballot.clone(),
734 payloads_to_send
735 .clone()
736 .end_atomic()
737 .map(q!(move |((slot, ballot), value)| P2a {
738 sender: CLUSTER_SELF_ID.clone(),
739 ballot,
740 slot,
741 value
742 }))
743 .broadcast(acceptors, TCP.fail_stop().bincode(), nondet!())
744 .values(),
745 a_checkpoint,
746 proposers,
747 );
748
749 let (quorums, fails) = collect_quorum(a_to_proposers_p2b, f + 1, 2 * f + 1);
751
752 let p_to_replicas = join_responses(
753 quorums.map(q!(|k| (k, ()))),
754 payloads_to_send.batch_atomic(
755 proposer_tick,
756 nondet!(
757 ),
761 ),
762 );
763
764 (
765 p_to_replicas.map(q!(|((slot, _ballot), (value, _))| (slot, value))),
766 a_log,
767 fails.map(q!(|(_, ballot)| ballot)),
768 )
769}
770
771pub fn index_payloads<'a, L: Location<'a> + NoTick, P: PaxosPayload>(
773 p_max_slot: Optional<usize, Tick<L>, Bounded>,
774 c_to_proposers: Stream<P, Tick<L>, Bounded>,
775) -> Stream<(usize, P), Tick<L>, Bounded> {
776 let tick = c_to_proposers.location().clone();
777 let sliced_result = sliced! {
778 let mut next_slot = use::state(|l| l.singleton(q!(0)));
779 let updated_max_slot = use::atomic(p_max_slot.latest_atomic(), nondet!());
780 let payload_batch = use::atomic(c_to_proposers.all_ticks_atomic(), nondet!());
781
782 let next_slot_after_reconciling_p1bs = updated_max_slot.map(q!(|s| s + 1));
783 let base_slot = next_slot_after_reconciling_p1bs.unwrap_or(next_slot);
784
785 let indexed_payloads = payload_batch
786 .enumerate()
787 .cross_singleton(base_slot.clone())
788 .map(q!(|((index, payload), base_slot)| (
789 base_slot + index,
790 payload
791 )));
792
793 let num_payloads = indexed_payloads.clone().count();
794 next_slot = num_payloads
795 .zip(base_slot)
796 .map(q!(|(num_payloads, base_slot)| base_slot + num_payloads));
797
798 yield_atomic(indexed_payloads)
799 };
800 sliced_result.batch_atomic(&tick, nondet!())
801}
802
803#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
804pub fn acceptor_p2<'a, P: PaxosPayload, S: Clone>(
805 acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
806 a_max_ballot: Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
807 p_to_acceptors_p2a: Stream<P2a<P, S>, Cluster<'a, Acceptor>, Unbounded, NoOrder>,
808 a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
809 proposers: &Cluster<'a, S>,
810) -> (
811 Singleton<
812 (Option<usize>, HashMap<usize, LogValue<P>>),
813 Atomic<Cluster<'a, Acceptor>>,
814 Unbounded,
815 >,
816 Stream<((usize, Ballot), Result<(), Ballot>), Cluster<'a, S>, Unbounded, NoOrder>,
817) {
818 let p_to_acceptors_p2a_batch = p_to_acceptors_p2a.batch(
819 acceptor_tick,
820 nondet!(
821 ),
826 );
827
828 let a_checkpoint = a_checkpoint.snapshot(
829 acceptor_tick,
830 nondet!(
831 ),
834 );
835 let a_p2as_to_place_in_log = p_to_acceptors_p2a_batch
838 .clone()
839 .cross_singleton(a_max_ballot.clone()) .filter_map(q!(|(p2a, max_ballot)|
841 if p2a.ballot >= max_ballot {
842 Some((p2a.slot, LogValue {
843 ballot: p2a.ballot,
844 value: p2a.value,
845 }))
846 } else {
847 None
848 }
849 ));
850 let a_log = a_p2as_to_place_in_log.across_ticks(|s| {
851 s.into_keyed().reduce_watermark(
852 a_checkpoint.clone(),
853 q!(|prev_entry, entry| {
854 if entry.ballot > prev_entry.ballot {
856 *prev_entry = LogValue {
857 ballot: entry.ballot,
858 value: entry.value,
859 };
860 }
861 }, commutative = manual_proof!()),
862 )
863 });
864 let a_log_snapshot = a_log.entries().fold(
865 q!(|| HashMap::new()),
866 q!(
867 |map, (slot, entry)| {
868 map.insert(slot, entry);
869 },
870 commutative = manual_proof!()
871 ),
872 );
873
874 let a_to_proposers_p2b = p_to_acceptors_p2a_batch
875 .cross_singleton(a_max_ballot)
876 .map(q!(|(p2a, max_ballot)| (
877 p2a.sender,
878 (
879 (p2a.slot, p2a.ballot.clone()),
880 if p2a.ballot == max_ballot {
881 Ok(())
882 } else {
883 Err(max_ballot)
884 }
885 )
886 )))
887 .all_ticks()
888 .demux(proposers, TCP.fail_stop().bincode())
889 .values();
890
891 (
892 a_checkpoint
893 .into_singleton()
894 .zip(a_log_snapshot)
895 .latest_atomic(),
896 a_to_proposers_p2b,
897 )
898}
899
900#[cfg(test)]
901mod tests {
902 use hydro_lang::prelude::*;
903
904 use super::index_payloads;
905
906 #[test]
907 fn proposer_indexes_payloads() {
908 let mut flow = FlowBuilder::new();
909 let node = flow.process::<()>();
910 let tick = node.tick();
911
912 let (in_send, input_payloads) = node.sim_input();
913 let indexed = index_payloads(
914 tick.none(),
915 input_payloads.batch(&tick, nondet!()),
916 );
917
918 let out_recv = indexed.all_ticks().sim_output();
919
920 flow.sim().exhaustive(async || {
921 in_send.send(1);
922 in_send.send(2);
923 in_send.send(3);
924 in_send.send(4);
925
926 out_recv
927 .assert_yields_only([(0, 1), (1, 2), (2, 3), (3, 4)])
928 .await;
929 });
930 }
931
932 #[test]
933 fn proposer_indexes_payloads_jumps_on_new_max() {
934 let mut flow = FlowBuilder::new();
935 let node = flow.process::<()>();
936 let tick = node.tick();
937
938 let (in_send, input_payloads) = node.sim_input();
939 let release_new_base = node
940 .source_iter(q!([123]))
941 .batch(&tick, nondet!())
942 .first();
943 let indexed = index_payloads(
944 release_new_base,
945 input_payloads.batch(&tick, nondet!()),
946 );
947
948 let out_recv = indexed.all_ticks().sim_output();
949
950 flow.sim().exhaustive(async || {
951 in_send.send(1);
952 in_send.send(2);
953 in_send.send(3);
954 in_send.send(4);
955
956 let mut next_expected = 0;
957 for i in 1..=4 {
958 let (next_slot, v) = out_recv.next().await.unwrap();
959 assert_eq!(v, i);
960
961 if next_expected < 123 {
962 assert!(next_slot == next_expected || next_slot == 124);
963 } else {
964 assert!(next_slot == next_expected);
965 }
966
967 next_expected = next_slot + 1;
968 }
969 });
970 }
971}