hydro_test/cluster/
paxos_with_client.rs

1use std::fmt::Debug;
2
3use hydro_lang::*;
4use serde::Serialize;
5use serde::de::DeserializeOwned;
6
7use super::paxos::PaxosPayload;
8
9pub trait PaxosLike<'a>: Sized {
10    /// The nodes that receive inputs in Paxos. Usually the proposers.
11    type PaxosIn: 'a;
12
13    /// The nodes that store logs in Paxos. Usually the acceptors.
14    type PaxosLog: 'a;
15
16    /// The nodes that output the results of Paxos. Proposers in Paxos, Proxy leaders in Compartmentalized Paxos.
17    type PaxosOut: 'a;
18    type Ballot: Clone + Ord + Debug + Serialize + DeserializeOwned;
19
20    fn payload_recipients(&self) -> &Cluster<'a, Self::PaxosIn>;
21    fn log_stores(&self) -> &Cluster<'a, Self::PaxosLog>;
22
23    fn get_recipient_from_ballot<L: Location<'a>>(
24        ballot: Optional<Self::Ballot, L, Unbounded>,
25    ) -> Optional<ClusterId<Self::PaxosIn>, L, Unbounded>;
26
27    /// # Safety
28    /// During leader-reelection, the latest known leader may be stale, which may
29    /// result in non-deterministic dropping of payloads.
30    #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
31    unsafe fn build<P: PaxosPayload>(
32        self,
33        payload_generator: impl FnOnce(
34            Stream<Self::Ballot, Cluster<'a, Self::PaxosIn>, Unbounded>,
35        ) -> Stream<P, Cluster<'a, Self::PaxosIn>, Unbounded>,
36        checkpoints: Optional<usize, Cluster<'a, Self::PaxosLog>, Unbounded>,
37    ) -> Stream<(usize, Option<P>), Cluster<'a, Self::PaxosOut>, Unbounded, NoOrder>;
38
39    /// # Safety
40    /// During leader-reelection, the latest known leader may be stale, which may
41    /// result in non-deterministic dropping of payloads.
42    #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
43    unsafe fn with_client<C: 'a, P: PaxosPayload>(
44        self,
45        clients: &Cluster<'a, C>,
46        payloads: Stream<P, Cluster<'a, C>, Unbounded>,
47        checkpoints: Optional<usize, Cluster<'a, Self::PaxosLog>, Unbounded>,
48    ) -> Stream<(usize, Option<P>), Cluster<'a, Self::PaxosOut>, Unbounded, NoOrder> {
49        unsafe {
50            // SAFETY: Non-deterministic leader notifications are handled in `cur_leader_id`. We do not
51            // care about the order in which key writes are processed, which is the non-determinism in
52            // `sequenced_payloads`.
53            let leaders = self.payload_recipients().clone();
54
55            self.build(
56                move |new_leader_elected| {
57                    let cur_leader_id = Self::get_recipient_from_ballot(
58                        new_leader_elected
59                            .broadcast_bincode_anonymous(clients)
60                            .inspect(q!(|ballot| println!(
61                                "Client notified that leader was elected: {:?}",
62                                ballot
63                            )))
64                            .max(),
65                    );
66
67                    let payloads_at_proposer = {
68                        // SAFETY: the risk here is that we send a batch of requests
69                        // with a stale leader ID, but because the leader ID comes from the
70                        // network there is no way to guarantee that it is up to date. This
71                        // is documented non-determinism.
72
73                        let client_tick = clients.tick();
74                        let payload_batch = payloads.tick_batch(&client_tick);
75
76                        let latest_leader = cur_leader_id.latest_tick(&client_tick);
77
78                        let (unsent_payloads_complete, unsent_payloads) =
79                            client_tick.cycle::<Stream<_, _, _, TotalOrder>>();
80
81                        let all_payloads = unsent_payloads.chain(payload_batch);
82
83                        unsent_payloads_complete.complete_next_tick(
84                            all_payloads.clone().continue_unless(latest_leader.clone()),
85                        );
86
87                        all_payloads.cross_singleton(latest_leader).all_ticks()
88                    }
89                    .map(q!(move |(payload, leader_id)| (leader_id, payload)))
90                    .send_bincode_anonymous(&leaders);
91
92                    let payloads_at_proposer = {
93                        // SAFETY: documented non-determinism in interleaving of client payloads
94                        payloads_at_proposer.assume_ordering()
95                    };
96
97                    payloads_at_proposer
98                },
99                checkpoints,
100            )
101        }
102    }
103}