1use std::fmt::Debug;
23use hydro_lang::*;
4use serde::Serialize;
5use serde::de::DeserializeOwned;
67use super::paxos::PaxosPayload;
89pub trait PaxosLike<'a>: Sized {
10/// The nodes that receive inputs in Paxos. Usually the proposers.
11type PaxosIn: 'a;
1213/// The nodes that store logs in Paxos. Usually the acceptors.
14type PaxosLog: 'a;
1516/// The nodes that output the results of Paxos. Proposers in Paxos, Proxy leaders in Compartmentalized Paxos.
17type PaxosOut: 'a;
18type Ballot: Clone + Ord + Debug + Serialize + DeserializeOwned;
1920fn payload_recipients(&self) -> &Cluster<'a, Self::PaxosIn>;
21fn log_stores(&self) -> &Cluster<'a, Self::PaxosLog>;
2223fn get_recipient_from_ballot<L: Location<'a>>(
24 ballot: Optional<Self::Ballot, L, Unbounded>,
25 ) -> Optional<ClusterId<Self::PaxosIn>, L, Unbounded>;
2627/// # Safety
28 /// During leader-reelection, the latest known leader may be stale, which may
29 /// result in non-deterministic dropping of payloads.
30#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
31unsafe fn build<P: PaxosPayload>(
32self,
33 payload_generator: impl FnOnce(
34 Stream<Self::Ballot, Cluster<'a, Self::PaxosIn>, Unbounded>,
35 ) -> Stream<P, Cluster<'a, Self::PaxosIn>, Unbounded>,
36 checkpoints: Optional<usize, Cluster<'a, Self::PaxosLog>, Unbounded>,
37 ) -> Stream<(usize, Option<P>), Cluster<'a, Self::PaxosOut>, Unbounded, NoOrder>;
3839/// # Safety
40 /// During leader-reelection, the latest known leader may be stale, which may
41 /// result in non-deterministic dropping of payloads.
42#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
43unsafe fn with_client<C: 'a, P: PaxosPayload>(
44self,
45 clients: &Cluster<'a, C>,
46 payloads: Stream<P, Cluster<'a, C>, Unbounded>,
47 checkpoints: Optional<usize, Cluster<'a, Self::PaxosLog>, Unbounded>,
48 ) -> Stream<(usize, Option<P>), Cluster<'a, Self::PaxosOut>, Unbounded, NoOrder> {
49unsafe {
50// SAFETY: Non-deterministic leader notifications are handled in `cur_leader_id`. We do not
51 // care about the order in which key writes are processed, which is the non-determinism in
52 // `sequenced_payloads`.
53let leaders = self.payload_recipients().clone();
5455self.build(
56move |new_leader_elected| {
57let cur_leader_id = Self::get_recipient_from_ballot(
58 new_leader_elected
59 .broadcast_bincode_anonymous(clients)
60 .inspect(q!(|ballot| println!(
61"Client notified that leader was elected: {:?}",
62 ballot
63 )))
64 .max(),
65 );
6667let payloads_at_proposer = {
68// SAFETY: the risk here is that we send a batch of requests
69 // with a stale leader ID, but because the leader ID comes from the
70 // network there is no way to guarantee that it is up to date. This
71 // is documented non-determinism.
7273let client_tick = clients.tick();
74let payload_batch = payloads.tick_batch(&client_tick);
7576let latest_leader = cur_leader_id.latest_tick(&client_tick);
7778let (unsent_payloads_complete, unsent_payloads) =
79 client_tick.cycle::<Stream<_, _, _, TotalOrder>>();
8081let all_payloads = unsent_payloads.chain(payload_batch);
8283 unsent_payloads_complete.complete_next_tick(
84 all_payloads.clone().continue_unless(latest_leader.clone()),
85 );
8687 all_payloads.cross_singleton(latest_leader).all_ticks()
88 }
89 .map(q!(move |(payload, leader_id)| (leader_id, payload)))
90 .send_bincode_anonymous(&leaders);
9192let payloads_at_proposer = {
93// SAFETY: documented non-determinism in interleaving of client payloads
94payloads_at_proposer.assume_ordering()
95 };
9697 payloads_at_proposer
98 },
99 checkpoints,
100 )
101 }
102 }
103}