1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::hash::Hash;
4use std::time::Duration;
5
6use hydro_lang::stream::AtLeastOnce;
7use hydro_lang::*;
8use hydro_std::quorum::{collect_quorum, collect_quorum_with_response};
9use hydro_std::request_response::join_responses;
10use serde::de::DeserializeOwned;
11use serde::{Deserialize, Serialize};
12
13use super::paxos_with_client::PaxosLike;
14
15#[derive(Serialize, Deserialize, Clone)]
16pub struct Proposer {}
17pub struct Acceptor {}
18
19#[derive(Clone, Copy)]
20pub struct PaxosConfig {
21 pub f: usize,
23 pub i_am_leader_send_timeout: u64,
25 pub i_am_leader_check_timeout: u64,
27 pub i_am_leader_check_timeout_delay_multiplier: usize,
29}
30
31pub trait PaxosPayload: Serialize + DeserializeOwned + PartialEq + Eq + Clone + Debug {}
32impl<T: Serialize + DeserializeOwned + PartialEq + Eq + Clone + Debug> PaxosPayload for T {}
33
34#[derive(Serialize, Deserialize, PartialEq, Eq, Copy, Clone, Debug, Hash)]
35pub struct Ballot {
36 pub num: u32,
37 pub proposer_id: MemberId<Proposer>,
38}
39
40impl Ord for Ballot {
41 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
42 self.num
43 .cmp(&other.num)
44 .then_with(|| self.proposer_id.raw_id.cmp(&other.proposer_id.raw_id))
45 }
46}
47
48impl PartialOrd for Ballot {
49 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
50 Some(self.cmp(other))
51 }
52}
53
54#[derive(Serialize, Deserialize, Clone, Debug)]
55pub struct LogValue<P> {
56 pub ballot: Ballot,
57 pub value: Option<P>, }
59
60#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
61pub struct P2a<P, S> {
62 pub sender: MemberId<S>,
63 pub ballot: Ballot,
64 pub slot: usize,
65 pub value: Option<P>, }
67
68pub struct CorePaxos<'a> {
69 pub proposers: Cluster<'a, Proposer>,
70 pub acceptors: Cluster<'a, Acceptor>,
71 pub paxos_config: PaxosConfig,
72}
73
74impl<'a> PaxosLike<'a> for CorePaxos<'a> {
75 type PaxosIn = Proposer;
76 type PaxosLog = Acceptor;
77 type PaxosOut = Proposer;
78 type Ballot = Ballot;
79
80 fn payload_recipients(&self) -> &Cluster<'a, Self::PaxosIn> {
81 &self.proposers
82 }
83
84 fn log_stores(&self) -> &Cluster<'a, Self::PaxosLog> {
85 &self.acceptors
86 }
87
88 fn get_recipient_from_ballot<L: Location<'a>>(
89 ballot: Optional<Self::Ballot, L, Unbounded>,
90 ) -> Optional<MemberId<Self::PaxosIn>, L, Unbounded> {
91 ballot.map(q!(|ballot| ballot.proposer_id))
92 }
93
94 fn build<P: PaxosPayload>(
95 self,
96 with_ballot: impl FnOnce(
97 Stream<Ballot, Cluster<'a, Self::PaxosIn>, Unbounded>,
98 ) -> Stream<P, Cluster<'a, Self::PaxosIn>, Unbounded>,
99 a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
100 nondet_leader: NonDet,
101 nondet_commit: NonDet,
102 ) -> Stream<(usize, Option<P>), Cluster<'a, Self::PaxosOut>, Unbounded, NoOrder> {
103 paxos_core(
104 &self.proposers,
105 &self.acceptors,
106 a_checkpoint,
107 with_ballot,
108 self.paxos_config,
109 nondet_leader,
110 nondet_commit,
111 )
112 .1
113 }
114}
115
116#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
133pub fn paxos_core<'a, P: PaxosPayload>(
134 proposers: &Cluster<'a, Proposer>,
135 acceptors: &Cluster<'a, Acceptor>,
136 a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
137 c_to_proposers: impl FnOnce(
138 Stream<Ballot, Cluster<'a, Proposer>, Unbounded>,
139 ) -> Stream<P, Cluster<'a, Proposer>, Unbounded>,
140 config: PaxosConfig,
141 nondet_leader: NonDet,
142 nondet_commit: NonDet,
143) -> (
144 Stream<Ballot, Cluster<'a, Proposer>, Unbounded>,
145 Stream<(usize, Option<P>), Cluster<'a, Proposer>, Unbounded, NoOrder>,
146) {
147 let f = config.f;
148
149 proposers
150 .source_iter(q!(["Proposers say hello"]))
151 .for_each(q!(|s| println!("{}", s)));
152
153 acceptors
154 .source_iter(q!(["Acceptors say hello"]))
155 .for_each(q!(|s| println!("{}", s)));
156
157 let proposer_tick = proposers.tick();
158 let acceptor_tick = acceptors.tick();
159
160 let (sequencing_max_ballot_complete_cycle, sequencing_max_ballot_forward_reference) =
161 proposers.forward_ref::<Stream<Ballot, _, _, NoOrder>>();
162 let (a_log_complete_cycle, a_log_forward_reference) =
163 acceptor_tick.forward_ref::<Singleton<_, _, _>>();
164
165 let (p_ballot, p_is_leader, p_relevant_p1bs, a_max_ballot) = leader_election(
166 proposers,
167 acceptors,
168 &proposer_tick,
169 &acceptor_tick,
170 f + 1,
171 2 * f + 1,
172 config,
173 sequencing_max_ballot_forward_reference,
174 a_log_forward_reference,
175 nondet!(
176 nondet_leader
180 ),
181 nondet!(
182 nondet_commit
185 ),
186 );
187
188 let just_became_leader = p_is_leader
189 .clone()
190 .continue_unless(p_is_leader.clone().defer_tick());
191
192 let c_to_proposers = c_to_proposers(
193 just_became_leader
194 .clone()
195 .then(p_ballot.clone())
196 .all_ticks(),
197 );
198
199 let (p_to_replicas, a_log, sequencing_max_ballots) = sequence_payload(
200 proposers,
201 acceptors,
202 &proposer_tick,
203 &acceptor_tick,
204 c_to_proposers,
205 a_checkpoint,
206 p_ballot.clone(),
207 p_is_leader,
208 p_relevant_p1bs,
209 f,
210 a_max_ballot,
211 nondet!(
212 nondet_commit
217 ),
218 );
219
220 a_log_complete_cycle.complete(a_log.snapshot(nondet!(
221 )));
225 sequencing_max_ballot_complete_cycle.complete(sequencing_max_ballots);
226
227 (
228 just_became_leader.then(p_ballot).all_ticks(),
230 p_to_replicas,
231 )
232}
233
234#[expect(
235 clippy::type_complexity,
236 clippy::too_many_arguments,
237 reason = "internal paxos code // TODO"
238)]
239pub fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>(
240 proposers: &Cluster<'a, Proposer>,
241 acceptors: &Cluster<'a, Acceptor>,
242 proposer_tick: &Tick<Cluster<'a, Proposer>>,
243 acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
244 quorum_size: usize,
245 num_quorum_participants: usize,
246 paxos_config: PaxosConfig,
247 p_received_p2b_ballots: Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder>,
248 a_log: Singleton<(Option<usize>, L), Tick<Cluster<'a, Acceptor>>, Bounded>,
249 nondet_leader: NonDet,
250 nondet_acceptor_ballot: NonDet,
251) -> (
252 Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
253 Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
254 Stream<(Option<usize>, L), Tick<Cluster<'a, Proposer>>, Bounded, NoOrder>,
255 Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
256) {
257 let (p1b_fail_complete, p1b_fail) =
258 proposers.forward_ref::<Stream<Ballot, _, Unbounded, NoOrder>>();
259 let (p_to_proposers_i_am_leader_complete_cycle, p_to_proposers_i_am_leader_forward_ref) =
260 proposers.forward_ref::<Stream<_, _, _, NoOrder, AtLeastOnce>>();
261 let (p_is_leader_complete_cycle, p_is_leader_forward_ref) =
262 proposer_tick.forward_ref::<Optional<(), _, _>>();
263 let p_received_max_ballot = p1b_fail
268 .interleave(p_received_p2b_ballots)
269 .interleave(p_to_proposers_i_am_leader_forward_ref)
270 .max()
271 .unwrap_or(proposers.singleton(q!(Ballot {
272 num: 0,
273 proposer_id: MemberId::from_raw(0)
274 })));
275
276 let (p_ballot, p_has_largest_ballot) = p_ballot_calc(
277 proposer_tick,
278 p_received_max_ballot.snapshot(
279 proposer_tick,
280 nondet!(
281 nondet_leader
284 ),
285 ),
286 );
287
288 let (p_to_proposers_i_am_leader, p_trigger_election) = p_leader_heartbeat(
289 proposers,
290 proposer_tick,
291 p_is_leader_forward_ref,
292 p_ballot.clone(),
293 paxos_config,
294 nondet!(
295 nondet_leader
298 ),
299 );
300
301 p_to_proposers_i_am_leader_complete_cycle.complete(p_to_proposers_i_am_leader);
302
303 let p_to_acceptors_p1a = p_trigger_election
304 .then(p_ballot.clone())
305 .all_ticks()
306 .inspect(q!(|_| println!("Proposer leader expired, sending P1a")))
307 .broadcast_bincode(acceptors, nondet!())
308 .values();
309
310 let (a_max_ballot, a_to_proposers_p1b) = acceptor_p1(
311 acceptor_tick,
312 p_to_acceptors_p1a.batch(
313 acceptor_tick,
314 nondet!(
315 nondet_acceptor_ballot
319 ),
320 ),
321 a_log,
322 proposers,
323 );
324
325 let (p_is_leader, p_accepted_values, fail_ballots) = p_p1b(
326 proposer_tick,
327 a_to_proposers_p1b.inspect(q!(|p1b| println!("Proposer received P1b: {:?}", p1b))),
328 p_ballot.clone(),
329 p_has_largest_ballot,
330 quorum_size,
331 num_quorum_participants,
332 );
333 p_is_leader_complete_cycle.complete(p_is_leader.clone());
334 p1b_fail_complete.complete(fail_ballots);
335
336 (p_ballot, p_is_leader, p_accepted_values, a_max_ballot)
337}
338
339#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
341fn p_ballot_calc<'a>(
342 proposer_tick: &Tick<Cluster<'a, Proposer>>,
343 p_received_max_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
344) -> (
345 Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
346 Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
347) {
348 let (p_ballot_num_complete_cycle, p_ballot_num) =
349 proposer_tick.cycle_with_initial(proposer_tick.singleton(q!(0)));
350
351 let p_new_ballot_num = p_received_max_ballot
352 .clone()
353 .zip(p_ballot_num.clone())
354 .map(q!(move |(received_max_ballot, ballot_num)| {
355 if received_max_ballot
356 > (Ballot {
357 num: ballot_num,
358 proposer_id: CLUSTER_SELF_ID,
359 })
360 {
361 received_max_ballot.num + 1
362 } else {
363 ballot_num
364 }
365 }));
366 p_ballot_num_complete_cycle.complete_next_tick(p_new_ballot_num);
367
368 let p_ballot = p_ballot_num.map(q!(move |num| Ballot {
369 num,
370 proposer_id: CLUSTER_SELF_ID
371 }));
372
373 let p_has_largest_ballot = p_received_max_ballot
374 .clone()
375 .zip(p_ballot.clone())
376 .filter(q!(
377 |(received_max_ballot, cur_ballot)| *received_max_ballot <= *cur_ballot
378 ))
379 .map(q!(|_| ()));
380
381 (p_ballot, p_has_largest_ballot)
383}
384
385#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
386fn p_leader_heartbeat<'a>(
387 proposers: &Cluster<'a, Proposer>,
388 proposer_tick: &Tick<Cluster<'a, Proposer>>,
389 p_is_leader: Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
390 p_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
391 paxos_config: PaxosConfig,
392 nondet_reelection: NonDet,
393) -> (
394 Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder, AtLeastOnce>,
395 Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
396) {
397 let i_am_leader_send_timeout = paxos_config.i_am_leader_send_timeout;
398 let i_am_leader_check_timeout = paxos_config.i_am_leader_check_timeout;
399 let i_am_leader_check_timeout_delay_multiplier =
400 paxos_config.i_am_leader_check_timeout_delay_multiplier;
401
402 let p_to_proposers_i_am_leader = p_is_leader
403 .clone()
404 .then(p_ballot)
405 .latest()
406 .sample_every(
407 q!(Duration::from_secs(i_am_leader_send_timeout)),
408 nondet!(
409 nondet_reelection
413 ),
414 )
415 .broadcast_bincode(proposers, nondet!())
416 .values();
417
418 let p_leader_expired = p_to_proposers_i_am_leader
419 .clone()
420 .timeout(
421 q!(Duration::from_secs(i_am_leader_check_timeout)),
422 nondet!(
423 nondet_reelection
427 ),
428 )
429 .snapshot(proposer_tick, nondet!())
430 .continue_unless(p_is_leader);
431
432 let p_trigger_election = p_leader_expired.continue_if(
434 proposers
435 .source_interval_delayed(
436 q!(Duration::from_secs(
437 (CLUSTER_SELF_ID.raw_id * i_am_leader_check_timeout_delay_multiplier as u32)
438 .into()
439 )),
440 q!(Duration::from_secs(i_am_leader_check_timeout)),
441 nondet!(
442 nondet_reelection
446 ),
447 )
448 .batch(proposer_tick, nondet!())
449 .first(),
450 );
451 (p_to_proposers_i_am_leader, p_trigger_election)
452}
453
454#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
455fn acceptor_p1<'a, L: Serialize + DeserializeOwned + Clone>(
456 acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
457 p_to_acceptors_p1a: Stream<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded, NoOrder>,
458 a_log: Singleton<(Option<usize>, L), Tick<Cluster<'a, Acceptor>>, Bounded>,
459 proposers: &Cluster<'a, Proposer>,
460) -> (
461 Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
462 Stream<(Ballot, Result<(Option<usize>, L), Ballot>), Cluster<'a, Proposer>, Unbounded, NoOrder>,
463) {
464 let a_max_ballot = p_to_acceptors_p1a
465 .clone()
466 .inspect(q!(|p1a| println!("Acceptor received P1a: {:?}", p1a)))
467 .persist()
468 .max()
469 .unwrap_or(acceptor_tick.singleton(q!(Ballot {
470 num: 0,
471 proposer_id: MemberId::from_raw(0)
472 })));
473
474 (
475 a_max_ballot.clone(),
476 p_to_acceptors_p1a
477 .cross_singleton(a_max_ballot)
478 .cross_singleton(a_log)
479 .map(q!(|((ballot, max_ballot), log)| (
480 ballot.proposer_id,
481 (
482 ballot,
483 if ballot == max_ballot {
484 Ok(log)
485 } else {
486 Err(max_ballot)
487 }
488 )
489 )))
490 .all_ticks()
491 .demux_bincode(proposers)
492 .values(),
493 )
494}
495
496#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
498fn p_p1b<'a, P: Clone + Serialize + DeserializeOwned>(
499 proposer_tick: &Tick<Cluster<'a, Proposer>>,
500 a_to_proposers_p1b: Stream<
501 (Ballot, Result<(Option<usize>, P), Ballot>),
502 Cluster<'a, Proposer>,
503 Unbounded,
504 NoOrder,
505 >,
506 p_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
507 p_has_largest_ballot: Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
508 quorum_size: usize,
509 num_quorum_participants: usize,
510) -> (
511 Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
512 Stream<(Option<usize>, P), Tick<Cluster<'a, Proposer>>, Bounded, NoOrder>,
513 Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder>,
514) {
515 let (quorums, fails) = collect_quorum_with_response(
516 a_to_proposers_p1b.atomic(proposer_tick),
517 quorum_size,
518 num_quorum_participants,
519 );
520
521 let p_received_quorum_of_p1bs = quorums
522 .into_keyed()
523 .assume_ordering::<TotalOrder>(nondet!(
524 ))
526 .fold(
527 q!(|| vec![]),
528 q!(|logs, log| {
529 logs.push(log);
530 }),
531 )
532 .snapshot(nondet!())
533 .entries()
534 .max_by_key(q!(|t| t.0))
535 .zip(p_ballot.clone())
536 .filter_map(q!(
537 move |((quorum_ballot, quorum_accepted), my_ballot)| if quorum_ballot == my_ballot {
538 Some(quorum_accepted)
539 } else {
540 None
541 }
542 ));
543
544 let p_is_leader = p_received_quorum_of_p1bs
545 .clone()
546 .map(q!(|_| ()))
547 .continue_if(p_has_largest_ballot.clone());
548
549 (
550 p_is_leader,
551 p_received_quorum_of_p1bs.flatten_unordered(),
553 fails.end_atomic().map(q!(|(_, ballot)| ballot)),
554 )
555}
556
557#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
558pub fn recommit_after_leader_election<'a, P: PaxosPayload>(
559 accepted_logs: Stream<
560 (Option<usize>, HashMap<usize, LogValue<P>>),
561 Tick<Cluster<'a, Proposer>>,
562 Bounded,
563 NoOrder,
564 >,
565 p_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
566 f: usize,
567) -> (
568 Stream<((usize, Ballot), Option<P>), Tick<Cluster<'a, Proposer>>, Bounded, NoOrder>,
569 Optional<usize, Tick<Cluster<'a, Proposer>>, Bounded>,
570) {
571 let p_p1b_max_checkpoint = accepted_logs
572 .clone()
573 .filter_map(q!(|(checkpoint, _log)| checkpoint))
574 .max()
575 .into_singleton();
576 let p_p1b_highest_entries_and_count = accepted_logs
577 .map(q!(|(_checkpoint, log)| log))
578 .flatten_unordered() .into_keyed()
580 .fold_commutative::<(usize, Option<LogValue<P>>), _, _>(q!(|| (0, None)), q!(|curr_entry, new_entry| {
581 if let Some(curr_entry_payload) = &mut curr_entry.1 {
582 let same_values = new_entry.value == curr_entry_payload.value;
583 let higher_ballot = new_entry.ballot > curr_entry_payload.ballot;
584 if same_values {
586 curr_entry.0 += 1;
587 }
588 if higher_ballot {
590 curr_entry_payload.ballot = new_entry.ballot;
591 if !same_values {
593 curr_entry.0 = 1;
594 curr_entry_payload.value = new_entry.value;
595 }
596 }
597 } else {
598 *curr_entry = (1, Some(new_entry));
599 }
600 }))
601 .map(q!(|(count, entry)| (count, entry.unwrap())));
602 let p_log_to_try_commit = p_p1b_highest_entries_and_count
603 .clone()
604 .entries()
605 .cross_singleton(p_ballot.clone())
606 .cross_singleton(p_p1b_max_checkpoint.clone())
607 .filter_map(q!(move |(((slot, (count, entry)), ballot), checkpoint)| {
608 if count > f {
609 return None;
610 } else if let Some(checkpoint) = checkpoint
611 && slot <= checkpoint
612 {
613 return None;
614 }
615 Some(((slot, ballot), entry.value))
616 }));
617 let p_max_slot = p_p1b_highest_entries_and_count.clone().keys().max();
618 let p_proposed_slots = p_p1b_highest_entries_and_count.clone().keys();
619 let p_log_holes = p_max_slot
620 .clone()
621 .zip(p_p1b_max_checkpoint)
622 .flat_map_ordered(q!(|(max_slot, checkpoint)| {
623 if let Some(checkpoint) = checkpoint {
624 (checkpoint + 1)..max_slot
625 } else {
626 0..max_slot
627 }
628 }))
629 .filter_not_in(p_proposed_slots)
630 .cross_singleton(p_ballot.clone())
631 .map(q!(move |(slot, ballot)| ((slot, ballot), None)));
632
633 (p_log_to_try_commit.chain(p_log_holes), p_max_slot)
634}
635
636#[expect(
637 clippy::type_complexity,
638 clippy::too_many_arguments,
639 reason = "internal paxos code // TODO"
640)]
641fn sequence_payload<'a, P: PaxosPayload>(
642 proposers: &Cluster<'a, Proposer>,
643 acceptors: &Cluster<'a, Acceptor>,
644 proposer_tick: &Tick<Cluster<'a, Proposer>>,
645 acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
646 c_to_proposers: Stream<P, Cluster<'a, Proposer>, Unbounded>,
647 a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
648
649 p_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
650 p_is_leader: Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
651
652 p_relevant_p1bs: Stream<
653 (Option<usize>, HashMap<usize, LogValue<P>>),
654 Tick<Cluster<'a, Proposer>>,
655 Bounded,
656 NoOrder,
657 >,
658 f: usize,
659
660 a_max_ballot: Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
661
662 nondet_commit: NonDet,
663) -> (
664 Stream<(usize, Option<P>), Cluster<'a, Proposer>, Unbounded, NoOrder>,
665 Singleton<
666 (Option<usize>, HashMap<usize, LogValue<P>>),
667 Atomic<Cluster<'a, Acceptor>>,
668 Unbounded,
669 >,
670 Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder>,
671) {
672 let (p_log_to_recommit, p_max_slot) =
673 recommit_after_leader_election(p_relevant_p1bs, p_ballot.clone(), f);
674
675 let indexed_payloads = index_payloads(
676 proposer_tick,
677 p_max_slot,
678 c_to_proposers
679 .batch(
680 proposer_tick,
681 nondet!(
682 nondet_commit
688 ),
689 )
690 .continue_if(p_is_leader.clone()),
691 );
692
693 let payloads_to_send = indexed_payloads
694 .cross_singleton(p_ballot.clone())
695 .map(q!(|((slot, payload), ballot)| (
696 (slot, ballot),
697 Some(payload)
698 )))
699 .chain(p_log_to_recommit)
700 .continue_if(p_is_leader)
701 .all_ticks_atomic();
702
703 let (a_log, a_to_proposers_p2b) = acceptor_p2(
704 acceptor_tick,
705 a_max_ballot.clone(),
706 payloads_to_send
707 .clone()
708 .end_atomic()
709 .map(q!(move |((slot, ballot), value)| P2a {
710 sender: CLUSTER_SELF_ID,
711 ballot,
712 slot,
713 value
714 }))
715 .broadcast_bincode(acceptors, nondet!())
716 .values(),
717 a_checkpoint,
718 proposers,
719 );
720
721 let (quorums, fails) =
723 collect_quorum(a_to_proposers_p2b.atomic(proposer_tick), f + 1, 2 * f + 1);
724
725 let p_to_replicas = join_responses(
726 proposer_tick,
727 quorums.map(q!(|k| (k, ()))),
728 payloads_to_send.batch(nondet!(
729 )),
732 );
733
734 (
735 p_to_replicas
736 .end_atomic()
737 .map(q!(|((slot, _ballot), (value, _))| (slot, value))),
738 a_log,
739 fails.end_atomic().map(q!(|(_, ballot)| ballot)),
740 )
741}
742
743pub fn index_payloads<'a, P: PaxosPayload>(
745 proposer_tick: &Tick<Cluster<'a, Proposer>>,
746 p_max_slot: Optional<usize, Tick<Cluster<'a, Proposer>>, Bounded>,
747 c_to_proposers: Stream<P, Tick<Cluster<'a, Proposer>>, Bounded>,
748) -> Stream<(usize, P), Tick<Cluster<'a, Proposer>>, Bounded> {
749 let (p_next_slot_complete_cycle, p_next_slot) =
750 proposer_tick.cycle_with_initial::<Singleton<usize, _, _>>(proposer_tick.singleton(q!(0)));
751 let p_next_slot_after_reconciling_p1bs = p_max_slot.map(q!(|max_slot| max_slot + 1));
752
753 let base_slot = p_next_slot_after_reconciling_p1bs.unwrap_or(p_next_slot);
754
755 let p_indexed_payloads = c_to_proposers
756 .enumerate()
757 .cross_singleton(base_slot.clone())
758 .map(q!(|((index, payload), base_slot)| (
759 base_slot + index,
760 payload
761 )));
762
763 let p_num_payloads = p_indexed_payloads.clone().count();
764 let p_next_slot_after_sending_payloads =
765 p_num_payloads
766 .clone()
767 .zip(base_slot)
768 .map(q!(|(num_payloads, base_slot)| base_slot + num_payloads));
769
770 p_next_slot_complete_cycle.complete_next_tick(p_next_slot_after_sending_payloads);
771 p_indexed_payloads
772}
773
774#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
775pub fn acceptor_p2<'a, P: PaxosPayload, S: Clone>(
776 acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
777 a_max_ballot: Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
778 p_to_acceptors_p2a: Stream<P2a<P, S>, Cluster<'a, Acceptor>, Unbounded, NoOrder>,
779 a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
780 proposers: &Cluster<'a, S>,
781) -> (
782 Singleton<
783 (Option<usize>, HashMap<usize, LogValue<P>>),
784 Atomic<Cluster<'a, Acceptor>>,
785 Unbounded,
786 >,
787 Stream<((usize, Ballot), Result<(), Ballot>), Cluster<'a, S>, Unbounded, NoOrder>,
788) {
789 let p_to_acceptors_p2a_batch = p_to_acceptors_p2a.batch(
790 acceptor_tick,
791 nondet!(
792 ),
797 );
798
799 let a_checkpoint = a_checkpoint.snapshot(
800 acceptor_tick,
801 nondet!(
802 ),
805 );
806 let a_p2as_to_place_in_log = p_to_acceptors_p2a_batch
809 .clone()
810 .cross_singleton(a_max_ballot.clone()) .filter_map(q!(|(p2a, max_ballot)|
812 if p2a.ballot >= max_ballot {
813 Some((p2a.slot, LogValue {
814 ballot: p2a.ballot,
815 value: p2a.value,
816 }))
817 } else {
818 None
819 }
820 ));
821 let a_log = a_p2as_to_place_in_log
822 .all_ticks_atomic()
823 .into_keyed()
824 .reduce_watermark_commutative(
825 a_checkpoint.clone(),
826 q!(|prev_entry, entry| {
827 if entry.ballot > prev_entry.ballot {
829 *prev_entry = LogValue {
830 ballot: entry.ballot,
831 value: entry.value,
832 };
833 }
834 }),
835 );
836 let a_log_snapshot = a_log
837 .snapshot(nondet!(
838 ))
841 .entries()
842 .fold_commutative(
843 q!(|| HashMap::new()),
844 q!(|map, (slot, entry)| {
845 map.insert(slot, entry);
846 }),
847 );
848
849 let a_to_proposers_p2b = p_to_acceptors_p2a_batch
850 .cross_singleton(a_max_ballot)
851 .map(q!(|(p2a, max_ballot)| (
852 p2a.sender,
853 (
854 (p2a.slot, p2a.ballot),
855 if p2a.ballot == max_ballot {
856 Ok(())
857 } else {
858 Err(max_ballot)
859 }
860 )
861 )))
862 .all_ticks()
863 .demux_bincode(proposers)
864 .values();
865
866 (
867 a_checkpoint
868 .into_singleton()
869 .zip(a_log_snapshot)
870 .latest_atomic(),
871 a_to_proposers_p2b,
872 )
873}