1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::hash::Hash;
4use std::time::Duration;
5
6use hydro_lang::*;
7use hydro_std::quorum::{collect_quorum, collect_quorum_with_response};
8use hydro_std::request_response::join_responses;
9use serde::de::DeserializeOwned;
10use serde::{Deserialize, Serialize};
11
12use super::paxos_with_client::PaxosLike;
13
14#[derive(Serialize, Deserialize, Clone)]
15pub struct Proposer {}
16pub struct Acceptor {}
17
18#[derive(Clone, Copy)]
19pub struct PaxosConfig {
20 pub f: usize,
22 pub i_am_leader_send_timeout: u64,
24 pub i_am_leader_check_timeout: u64,
26 pub i_am_leader_check_timeout_delay_multiplier: usize,
28}
29
30pub trait PaxosPayload: Serialize + DeserializeOwned + PartialEq + Eq + Clone + Debug {}
31impl<T: Serialize + DeserializeOwned + PartialEq + Eq + Clone + Debug> PaxosPayload for T {}
32
33#[derive(Serialize, Deserialize, PartialEq, Eq, Copy, Clone, Debug, Hash)]
34pub struct Ballot {
35 pub num: u32,
36 pub proposer_id: ClusterId<Proposer>,
37}
38
39impl Ord for Ballot {
40 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
41 self.num
42 .cmp(&other.num)
43 .then_with(|| self.proposer_id.raw_id.cmp(&other.proposer_id.raw_id))
44 }
45}
46
47impl PartialOrd for Ballot {
48 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
49 Some(self.cmp(other))
50 }
51}
52
53#[derive(Serialize, Deserialize, Clone, Debug)]
54pub struct LogValue<P> {
55 pub ballot: Ballot,
56 pub value: Option<P>, }
58
59#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
60pub struct P2a<P, S> {
61 pub sender: ClusterId<S>,
62 pub ballot: Ballot,
63 pub slot: usize,
64 pub value: Option<P>, }
66
67pub struct CorePaxos<'a> {
68 pub proposers: Cluster<'a, Proposer>,
69 pub acceptors: Cluster<'a, Acceptor>,
70 pub paxos_config: PaxosConfig,
71}
72
73impl<'a> PaxosLike<'a> for CorePaxos<'a> {
74 type PaxosIn = Proposer;
75 type PaxosLog = Acceptor;
76 type PaxosOut = Proposer;
77 type Ballot = Ballot;
78
79 fn payload_recipients(&self) -> &Cluster<'a, Self::PaxosIn> {
80 &self.proposers
81 }
82
83 fn log_stores(&self) -> &Cluster<'a, Self::PaxosLog> {
84 &self.acceptors
85 }
86
87 fn get_recipient_from_ballot<L: Location<'a>>(
88 ballot: Optional<Self::Ballot, L, Unbounded>,
89 ) -> Optional<ClusterId<Self::PaxosIn>, L, Unbounded> {
90 ballot.map(q!(|ballot| ballot.proposer_id))
91 }
92
93 unsafe fn build<P: PaxosPayload>(
94 self,
95 with_ballot: impl FnOnce(
96 Stream<Ballot, Cluster<'a, Self::PaxosIn>, Unbounded>,
97 ) -> Stream<P, Cluster<'a, Self::PaxosIn>, Unbounded>,
98 a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
99 ) -> Stream<(usize, Option<P>), Cluster<'a, Self::PaxosOut>, Unbounded, NoOrder> {
100 unsafe {
101 paxos_core(
102 &self.proposers,
103 &self.acceptors,
104 a_checkpoint,
105 with_ballot,
106 self.paxos_config,
107 )
108 .1
109 }
110 }
111}
112
113#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
130pub unsafe fn paxos_core<'a, P: PaxosPayload>(
131 proposers: &Cluster<'a, Proposer>,
132 acceptors: &Cluster<'a, Acceptor>,
133 a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
134 c_to_proposers: impl FnOnce(
135 Stream<Ballot, Cluster<'a, Proposer>, Unbounded>,
136 ) -> Stream<P, Cluster<'a, Proposer>, Unbounded>,
137 config: PaxosConfig,
138) -> (
139 Stream<Ballot, Cluster<'a, Proposer>, Unbounded>,
140 Stream<(usize, Option<P>), Cluster<'a, Proposer>, Unbounded, NoOrder>,
141) {
142 let f = config.f;
143
144 proposers
145 .source_iter(q!(["Proposers say hello"]))
146 .for_each(q!(|s| println!("{}", s)));
147
148 acceptors
149 .source_iter(q!(["Acceptors say hello"]))
150 .for_each(q!(|s| println!("{}", s)));
151
152 let proposer_tick = proposers.tick();
153 let acceptor_tick = acceptors.tick();
154
155 let (sequencing_max_ballot_complete_cycle, sequencing_max_ballot_forward_reference) =
156 proposers.forward_ref::<Stream<Ballot, _, _, NoOrder>>();
157 let (a_log_complete_cycle, a_log_forward_reference) =
158 acceptor_tick.forward_ref::<Singleton<_, _, _>>();
159
160 let (p_ballot, p_is_leader, p_relevant_p1bs, a_max_ballot) = unsafe {
161 leader_election(
167 proposers,
168 acceptors,
169 &proposer_tick,
170 &acceptor_tick,
171 f + 1,
172 2 * f + 1,
173 config,
174 sequencing_max_ballot_forward_reference,
175 a_log_forward_reference,
176 )
177 };
178
179 let just_became_leader = p_is_leader
180 .clone()
181 .continue_unless(p_is_leader.clone().defer_tick());
182
183 let c_to_proposers = c_to_proposers(
184 just_became_leader
185 .clone()
186 .then(p_ballot.clone())
187 .all_ticks(),
188 );
189
190 let (p_to_replicas, a_log, sequencing_max_ballots) = unsafe {
191 sequence_payload(
196 proposers,
197 acceptors,
198 &proposer_tick,
199 &acceptor_tick,
200 c_to_proposers,
201 a_checkpoint,
202 p_ballot.clone(),
203 p_is_leader,
204 p_relevant_p1bs,
205 f,
206 a_max_ballot,
207 )
208 };
209
210 a_log_complete_cycle.complete(unsafe {
211 a_log.latest_tick()
215 });
216 sequencing_max_ballot_complete_cycle.complete(sequencing_max_ballots);
217
218 (
219 just_became_leader.then(p_ballot).all_ticks(),
221 p_to_replicas,
222 )
223}
224
225#[expect(
226 clippy::type_complexity,
227 clippy::too_many_arguments,
228 clippy::missing_safety_doc,
229 reason = "internal paxos code // TODO"
230)]
231pub unsafe fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>(
232 proposers: &Cluster<'a, Proposer>,
233 acceptors: &Cluster<'a, Acceptor>,
234 proposer_tick: &Tick<Cluster<'a, Proposer>>,
235 acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
236 quorum_size: usize,
237 num_quorum_participants: usize,
238 paxos_config: PaxosConfig,
239 p_received_p2b_ballots: Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder>,
240 a_log: Singleton<(Option<usize>, L), Tick<Cluster<'a, Acceptor>>, Bounded>,
241) -> (
242 Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
243 Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
244 Stream<(Option<usize>, L), Tick<Cluster<'a, Proposer>>, Bounded, NoOrder>,
245 Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
246) {
247 let (p1b_fail_complete, p1b_fail) =
248 proposers.forward_ref::<Stream<Ballot, _, Unbounded, NoOrder>>();
249 let (p_to_proposers_i_am_leader_complete_cycle, p_to_proposers_i_am_leader_forward_ref) =
250 proposers.forward_ref::<Stream<_, _, _, NoOrder>>();
251 let (p_is_leader_complete_cycle, p_is_leader_forward_ref) =
252 proposer_tick.forward_ref::<Optional<(), _, _>>();
253 let p_received_max_ballot = p1b_fail
258 .union(p_received_p2b_ballots)
259 .union(p_to_proposers_i_am_leader_forward_ref)
260 .max()
261 .unwrap_or(proposers.singleton(q!(Ballot {
262 num: 0,
263 proposer_id: ClusterId::from_raw(0)
264 })));
265
266 let (p_ballot, p_has_largest_ballot) = p_ballot_calc(proposer_tick, unsafe {
267 p_received_max_ballot.latest_tick(proposer_tick)
270 });
271
272 let (p_to_proposers_i_am_leader, p_trigger_election) = unsafe {
273 p_leader_heartbeat(
276 proposers,
277 proposer_tick,
278 p_is_leader_forward_ref,
279 p_ballot.clone(),
280 paxos_config,
281 )
282 };
283
284 p_to_proposers_i_am_leader_complete_cycle.complete(p_to_proposers_i_am_leader);
285
286 let p_to_acceptors_p1a = p_trigger_election
287 .then(p_ballot.clone())
288 .all_ticks()
289 .inspect(q!(|_| println!("Proposer leader expired, sending P1a")))
290 .broadcast_bincode_anonymous(acceptors);
291
292 let (a_max_ballot, a_to_proposers_p1b) = acceptor_p1(
293 acceptor_tick,
294 unsafe {
295 p_to_acceptors_p1a.tick_batch(acceptor_tick)
299 },
300 a_log,
301 proposers,
302 );
303
304 let (p_is_leader, p_accepted_values, fail_ballots) = p_p1b(
305 proposer_tick,
306 a_to_proposers_p1b.inspect(q!(|p1b| println!("Proposer received P1b: {:?}", p1b))),
307 p_ballot.clone(),
308 p_has_largest_ballot,
309 quorum_size,
310 num_quorum_participants,
311 );
312 p_is_leader_complete_cycle.complete(p_is_leader.clone());
313 p1b_fail_complete.complete(fail_ballots.end_atomic());
314
315 (p_ballot, p_is_leader, p_accepted_values, a_max_ballot)
316}
317
318#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
320fn p_ballot_calc<'a>(
321 proposer_tick: &Tick<Cluster<'a, Proposer>>,
322 p_received_max_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
323) -> (
324 Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
325 Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
326) {
327 let (p_ballot_num_complete_cycle, p_ballot_num) =
328 proposer_tick.cycle_with_initial(proposer_tick.singleton(q!(0)));
329
330 let p_new_ballot_num = p_received_max_ballot
331 .clone()
332 .zip(p_ballot_num.clone())
333 .map(q!(move |(received_max_ballot, ballot_num)| {
334 if received_max_ballot
335 > (Ballot {
336 num: ballot_num,
337 proposer_id: CLUSTER_SELF_ID,
338 })
339 {
340 received_max_ballot.num + 1
341 } else {
342 ballot_num
343 }
344 }));
345 p_ballot_num_complete_cycle.complete_next_tick(p_new_ballot_num);
346
347 let p_ballot = p_ballot_num.map(q!(move |num| Ballot {
348 num,
349 proposer_id: CLUSTER_SELF_ID
350 }));
351
352 let p_has_largest_ballot = p_received_max_ballot
353 .clone()
354 .zip(p_ballot.clone())
355 .filter(q!(
356 |(received_max_ballot, cur_ballot)| *received_max_ballot <= *cur_ballot
357 ))
358 .map(q!(|_| ()));
359
360 (p_ballot, p_has_largest_ballot)
362}
363
364#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
365unsafe fn p_leader_heartbeat<'a>(
366 proposers: &Cluster<'a, Proposer>,
367 proposer_tick: &Tick<Cluster<'a, Proposer>>,
368 p_is_leader: Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
369 p_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
370 paxos_config: PaxosConfig,
371) -> (
372 Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder>,
373 Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
374) {
375 let i_am_leader_send_timeout = paxos_config.i_am_leader_send_timeout;
376 let i_am_leader_check_timeout = paxos_config.i_am_leader_check_timeout;
377 let i_am_leader_check_timeout_delay_multiplier =
378 paxos_config.i_am_leader_check_timeout_delay_multiplier;
379
380 let p_to_proposers_i_am_leader = unsafe {
381 p_is_leader
385 .clone()
386 .then(p_ballot)
387 .latest()
388 .sample_every(q!(Duration::from_secs(i_am_leader_send_timeout)))
389 }
390 .broadcast_bincode_anonymous(proposers);
391
392 let p_leader_expired = unsafe {
393 p_to_proposers_i_am_leader
397 .clone()
398 .timeout(q!(Duration::from_secs(i_am_leader_check_timeout)))
399 .latest_tick(proposer_tick)
400 .continue_unless(p_is_leader)
401 };
402
403 let p_trigger_election = unsafe {
405 p_leader_expired.continue_if(
409 proposers
410 .source_interval_delayed(
411 q!(Duration::from_secs(
412 (CLUSTER_SELF_ID.raw_id
413 * i_am_leader_check_timeout_delay_multiplier as u32)
414 .into()
415 )),
416 q!(Duration::from_secs(i_am_leader_check_timeout)),
417 )
418 .tick_batch(proposer_tick)
419 .first(),
420 )
421 };
422 (p_to_proposers_i_am_leader, p_trigger_election)
423}
424
425#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
426fn acceptor_p1<'a, L: Serialize + DeserializeOwned + Clone>(
427 acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
428 p_to_acceptors_p1a: Stream<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded, NoOrder>,
429 a_log: Singleton<(Option<usize>, L), Tick<Cluster<'a, Acceptor>>, Bounded>,
430 proposers: &Cluster<'a, Proposer>,
431) -> (
432 Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
433 Stream<(Ballot, Result<(Option<usize>, L), Ballot>), Cluster<'a, Proposer>, Unbounded, NoOrder>,
434) {
435 let a_max_ballot = p_to_acceptors_p1a
436 .clone()
437 .inspect(q!(|p1a| println!("Acceptor received P1a: {:?}", p1a)))
438 .persist()
439 .max()
440 .unwrap_or(acceptor_tick.singleton(q!(Ballot {
441 num: 0,
442 proposer_id: ClusterId::from_raw(0)
443 })));
444
445 (
446 a_max_ballot.clone(),
447 p_to_acceptors_p1a
448 .cross_singleton(a_max_ballot)
449 .cross_singleton(a_log)
450 .map(q!(|((ballot, max_ballot), log)| (
451 ballot.proposer_id,
452 (
453 ballot,
454 if ballot == max_ballot {
455 Ok(log)
456 } else {
457 Err(max_ballot)
458 }
459 )
460 )))
461 .all_ticks()
462 .send_bincode_anonymous(proposers),
463 )
464}
465
466#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
468fn p_p1b<'a, P: Clone + Serialize + DeserializeOwned>(
469 proposer_tick: &Tick<Cluster<'a, Proposer>>,
470 a_to_proposers_p1b: Stream<
471 (Ballot, Result<(Option<usize>, P), Ballot>),
472 Cluster<'a, Proposer>,
473 Unbounded,
474 NoOrder,
475 >,
476 p_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
477 p_has_largest_ballot: Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
478 quorum_size: usize,
479 num_quorum_participants: usize,
480) -> (
481 Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
482 Stream<(Option<usize>, P), Tick<Cluster<'a, Proposer>>, Bounded, NoOrder>,
483 Stream<Ballot, Atomic<Cluster<'a, Proposer>>, Unbounded, NoOrder>,
484) {
485 let (quorums, fails) = collect_quorum_with_response(
486 a_to_proposers_p1b.atomic(proposer_tick),
487 quorum_size,
488 num_quorum_participants,
489 );
490
491 let p_received_quorum_of_p1bs = unsafe {
492 quorums.tick_batch()
495 }
496 .persist()
497 .fold_keyed_commutative(
498 q!(|| vec![]),
499 q!(|logs, log| {
500 logs.push(log);
502 }),
503 )
504 .max_by_key(q!(|t| t.0))
505 .zip(p_ballot.clone())
506 .filter_map(q!(
507 move |((quorum_ballot, quorum_accepted), my_ballot)| if quorum_ballot == my_ballot {
508 Some(quorum_accepted)
509 } else {
510 None
511 }
512 ));
513
514 let p_is_leader = p_received_quorum_of_p1bs
515 .clone()
516 .map(q!(|_| ()))
517 .continue_if(p_has_largest_ballot.clone());
518
519 (
520 p_is_leader,
521 p_received_quorum_of_p1bs.flatten_unordered(),
523 fails.map(q!(|(_, ballot)| ballot)),
524 )
525}
526
527#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
528pub fn recommit_after_leader_election<'a, P: PaxosPayload>(
529 accepted_logs: Stream<
530 (Option<usize>, HashMap<usize, LogValue<P>>),
531 Tick<Cluster<'a, Proposer>>,
532 Bounded,
533 NoOrder,
534 >,
535 p_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
536 f: usize,
537) -> (
538 Stream<((usize, Ballot), Option<P>), Tick<Cluster<'a, Proposer>>, Bounded, NoOrder>,
539 Optional<usize, Tick<Cluster<'a, Proposer>>, Bounded>,
540) {
541 let p_p1b_max_checkpoint = accepted_logs
542 .clone()
543 .filter_map(q!(|(checkpoint, _log)| checkpoint))
544 .max()
545 .into_singleton();
546 let p_p1b_highest_entries_and_count = accepted_logs
547 .map(q!(|(_checkpoint, log)| log))
548 .flatten_unordered() .fold_keyed_commutative::<(usize, Option<LogValue<P>>), _, _>(q!(|| (0, None)), q!(|curr_entry, new_entry| {
550 if let Some(curr_entry_payload) = &mut curr_entry.1 {
551 let same_values = new_entry.value == curr_entry_payload.value;
552 let higher_ballot = new_entry.ballot > curr_entry_payload.ballot;
553 if same_values {
555 curr_entry.0 += 1;
556 }
557 if higher_ballot {
559 curr_entry_payload.ballot = new_entry.ballot;
560 if !same_values {
562 curr_entry.0 = 1;
563 curr_entry_payload.value = new_entry.value;
564 }
565 }
566 } else {
567 *curr_entry = (1, Some(new_entry));
568 }
569 }))
570 .map(q!(|(slot, (count, entry))| (slot, (count, entry.unwrap()))));
571 let p_log_to_try_commit = p_p1b_highest_entries_and_count
572 .clone()
573 .cross_singleton(p_ballot.clone())
574 .cross_singleton(p_p1b_max_checkpoint.clone())
575 .filter_map(q!(move |(((slot, (count, entry)), ballot), checkpoint)| {
576 if count > f {
577 return None;
578 } else if let Some(checkpoint) = checkpoint {
579 if slot <= checkpoint {
580 return None;
581 }
582 }
583 Some(((slot, ballot), entry.value))
584 }));
585 let p_max_slot = p_p1b_highest_entries_and_count
586 .clone()
587 .map(q!(|(slot, _)| slot))
588 .max();
589 let p_proposed_slots = p_p1b_highest_entries_and_count
590 .clone()
591 .map(q!(|(slot, _)| slot));
592 let p_log_holes = p_max_slot
593 .clone()
594 .zip(p_p1b_max_checkpoint)
595 .flat_map_ordered(q!(|(max_slot, checkpoint)| {
596 if let Some(checkpoint) = checkpoint {
597 (checkpoint + 1)..max_slot
598 } else {
599 0..max_slot
600 }
601 }))
602 .filter_not_in(p_proposed_slots)
603 .cross_singleton(p_ballot.clone())
604 .map(q!(move |(slot, ballot)| ((slot, ballot), None)));
605
606 (p_log_to_try_commit.chain(p_log_holes), p_max_slot)
607}
608
609#[expect(
610 clippy::type_complexity,
611 clippy::too_many_arguments,
612 reason = "internal paxos code // TODO"
613)]
614unsafe fn sequence_payload<'a, P: PaxosPayload>(
615 proposers: &Cluster<'a, Proposer>,
616 acceptors: &Cluster<'a, Acceptor>,
617 proposer_tick: &Tick<Cluster<'a, Proposer>>,
618 acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
619 c_to_proposers: Stream<P, Cluster<'a, Proposer>, Unbounded>,
620 a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
621
622 p_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
623 p_is_leader: Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
624
625 p_relevant_p1bs: Stream<
626 (Option<usize>, HashMap<usize, LogValue<P>>),
627 Tick<Cluster<'a, Proposer>>,
628 Bounded,
629 NoOrder,
630 >,
631 f: usize,
632
633 a_max_ballot: Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
634) -> (
635 Stream<(usize, Option<P>), Cluster<'a, Proposer>, Unbounded, NoOrder>,
636 Singleton<
637 (Option<usize>, HashMap<usize, LogValue<P>>),
638 Atomic<Cluster<'a, Acceptor>>,
639 Unbounded,
640 >,
641 Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder>,
642) {
643 let (p_log_to_recommit, p_max_slot) =
644 recommit_after_leader_election(p_relevant_p1bs, p_ballot.clone(), f);
645
646 let indexed_payloads = index_payloads(proposer_tick, p_max_slot, unsafe {
647 c_to_proposers
653 .tick_batch(proposer_tick)
654 .continue_if(p_is_leader.clone())
655 });
656
657 let payloads_to_send = indexed_payloads
658 .cross_singleton(p_ballot.clone())
659 .map(q!(|((slot, payload), ballot)| (
660 (slot, ballot),
661 Some(payload)
662 )))
663 .chain(p_log_to_recommit)
664 .continue_if(p_is_leader)
665 .all_ticks_atomic();
666
667 let (a_log, a_to_proposers_p2b) = acceptor_p2(
668 acceptor_tick,
669 a_max_ballot.clone(),
670 payloads_to_send
671 .clone()
672 .map(q!(move |((slot, ballot), value)| P2a {
673 sender: CLUSTER_SELF_ID,
674 ballot,
675 slot,
676 value
677 }))
678 .broadcast_bincode_anonymous(acceptors),
679 a_checkpoint,
680 proposers,
681 );
682
683 let (quorums, fails) =
685 collect_quorum(a_to_proposers_p2b.atomic(proposer_tick), f + 1, 2 * f + 1);
686
687 let p_to_replicas = join_responses(proposer_tick, quorums.map(q!(|k| (k, ()))), unsafe {
688 payloads_to_send.tick_batch()
691 });
692
693 (
694 p_to_replicas
695 .map(q!(|((slot, _ballot), (value, _))| (slot, value)))
696 .end_atomic(),
697 a_log,
698 fails.map(q!(|(_, ballot)| ballot)).end_atomic(),
699 )
700}
701
702#[derive(Clone)]
703pub enum CheckpointOrP2a<P, S> {
704 Checkpoint(usize),
705 P2a(P2a<P, S>),
706}
707
708pub fn index_payloads<'a, P: PaxosPayload>(
710 proposer_tick: &Tick<Cluster<'a, Proposer>>,
711 p_max_slot: Optional<usize, Tick<Cluster<'a, Proposer>>, Bounded>,
712 c_to_proposers: Stream<P, Tick<Cluster<'a, Proposer>>, Bounded>,
713) -> Stream<(usize, P), Tick<Cluster<'a, Proposer>>, Bounded> {
714 let (p_next_slot_complete_cycle, p_next_slot) =
715 proposer_tick.cycle_with_initial::<Singleton<usize, _, _>>(proposer_tick.singleton(q!(0)));
716 let p_next_slot_after_reconciling_p1bs = p_max_slot.map(q!(|max_slot| max_slot + 1));
717
718 let base_slot = p_next_slot_after_reconciling_p1bs.unwrap_or(p_next_slot);
719
720 let p_indexed_payloads = c_to_proposers
721 .enumerate()
722 .cross_singleton(base_slot.clone())
723 .map(q!(|((index, payload), base_slot)| (
724 base_slot + index,
725 payload
726 )));
727
728 let p_num_payloads = p_indexed_payloads.clone().count();
729 let p_next_slot_after_sending_payloads =
730 p_num_payloads
731 .clone()
732 .zip(base_slot)
733 .map(q!(|(num_payloads, base_slot)| base_slot + num_payloads));
734
735 p_next_slot_complete_cycle.complete_next_tick(p_next_slot_after_sending_payloads);
736 p_indexed_payloads
737}
738
739#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
740pub fn acceptor_p2<'a, P: PaxosPayload, S: Clone>(
741 acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
742 a_max_ballot: Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
743 p_to_acceptors_p2a: Stream<P2a<P, S>, Cluster<'a, Acceptor>, Unbounded, NoOrder>,
744 a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
745 proposers: &Cluster<'a, S>,
746) -> (
747 Singleton<
748 (Option<usize>, HashMap<usize, LogValue<P>>),
749 Atomic<Cluster<'a, Acceptor>>,
750 Unbounded,
751 >,
752 Stream<((usize, Ballot), Result<(), Ballot>), Cluster<'a, S>, Unbounded, NoOrder>,
753) {
754 let p_to_acceptors_p2a_batch = unsafe {
755 p_to_acceptors_p2a.tick_batch(acceptor_tick)
760 };
761
762 let a_new_checkpoint = unsafe {
763 a_checkpoint.latest_tick(acceptor_tick)
766 }
767 .delta()
768 .map(q!(|min_seq| CheckpointOrP2a::Checkpoint(min_seq)));
769 let a_p2as_to_place_in_log = p_to_acceptors_p2a_batch
772 .clone()
773 .cross_singleton(a_max_ballot.clone()) .filter_map(q!(|(p2a, max_ballot)|
775 if p2a.ballot >= max_ballot {
776 Some(CheckpointOrP2a::P2a(p2a))
777 } else {
778 None
779 }
780 ));
781 let a_log = a_p2as_to_place_in_log
782 .chain(a_new_checkpoint.into_stream())
783 .all_ticks_atomic()
784 .fold_commutative(
785 q!(|| (None, HashMap::new())),
786 q!(|(prev_checkpoint, log), checkpoint_or_p2a| {
787 match checkpoint_or_p2a {
788 CheckpointOrP2a::Checkpoint(new_checkpoint) => {
789 if prev_checkpoint
790 .map(|prev| new_checkpoint > prev)
791 .unwrap_or(true)
792 {
793 for slot in (prev_checkpoint.unwrap_or(0))..new_checkpoint {
794 log.remove(&slot);
795 }
796
797 *prev_checkpoint = Some(new_checkpoint);
798 }
799 }
800 CheckpointOrP2a::P2a(p2a) => {
801 if prev_checkpoint.map(|prev| p2a.slot > prev).unwrap_or(true)
803 && log
804 .get(&p2a.slot)
805 .map(|prev_p2a: &LogValue<_>| p2a.ballot > prev_p2a.ballot)
806 .unwrap_or(true)
807 {
808 log.insert(
809 p2a.slot,
810 LogValue {
811 ballot: p2a.ballot,
812 value: p2a.value,
813 },
814 );
815 }
816 }
817 }
818 }),
819 );
820
821 let a_to_proposers_p2b = p_to_acceptors_p2a_batch
822 .cross_singleton(a_max_ballot)
823 .map(q!(|(p2a, max_ballot)| (
824 p2a.sender,
825 (
826 (p2a.slot, p2a.ballot),
827 if p2a.ballot == max_ballot {
828 Ok(())
829 } else {
830 Err(max_ballot)
831 }
832 )
833 )))
834 .all_ticks()
835 .send_bincode_anonymous(proposers);
836 (a_log, a_to_proposers_p2b)
837}