hydro_test/cluster/
paxos_with_client.rs

1use std::fmt::Debug;
2
3use hydro_lang::location::MemberId;
4use hydro_lang::*;
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7
8use super::paxos::PaxosPayload;
9
10pub trait PaxosLike<'a>: Sized {
11    /// The nodes that receive inputs in Paxos. Usually the proposers.
12    type PaxosIn: 'a;
13
14    /// The nodes that store logs in Paxos. Usually the acceptors.
15    type PaxosLog: 'a;
16
17    /// The nodes that output the results of Paxos. Proposers in Paxos, Proxy leaders in Compartmentalized Paxos.
18    type PaxosOut: 'a;
19    type Ballot: Clone + Ord + Debug + Serialize + DeserializeOwned;
20
21    fn payload_recipients(&self) -> &Cluster<'a, Self::PaxosIn>;
22    fn log_stores(&self) -> &Cluster<'a, Self::PaxosLog>;
23
24    fn get_recipient_from_ballot<L: Location<'a>>(
25        ballot: Optional<Self::Ballot, L, Unbounded>,
26    ) -> Optional<MemberId<Self::PaxosIn>, L, Unbounded>;
27
28    /// # Non-Determinism
29    /// During leader-reelection, the latest known leader may be stale, which may
30    /// result in non-deterministic dropping of payloads.
31    #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
32    fn build<P: PaxosPayload>(
33        self,
34        payload_generator: impl FnOnce(
35            Stream<Self::Ballot, Cluster<'a, Self::PaxosIn>, Unbounded>,
36        ) -> Stream<P, Cluster<'a, Self::PaxosIn>, Unbounded>,
37        checkpoints: Optional<usize, Cluster<'a, Self::PaxosLog>, Unbounded>,
38        nondet_leader: NonDet,
39        nondet_commit: NonDet,
40    ) -> Stream<(usize, Option<P>), Cluster<'a, Self::PaxosOut>, Unbounded, NoOrder>;
41
42    /// # Non-Determinism
43    /// During leader-reelection, the latest known leader may be stale, which may
44    /// result in non-deterministic dropping of payloads. Also, payloads across
45    /// clients will be arbitrarily interleaved as they arrive at the leader.
46    #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
47    fn with_client<C: 'a, P: PaxosPayload>(
48        self,
49        clients: &Cluster<'a, C>,
50        payloads: Stream<P, Cluster<'a, C>, Unbounded>,
51        checkpoints: Optional<usize, Cluster<'a, Self::PaxosLog>, Unbounded>,
52        nondet_commit: NonDet,
53        nondet_order: NonDet,
54    ) -> Stream<(usize, Option<P>), Cluster<'a, Self::PaxosOut>, Unbounded, NoOrder> {
55        let leaders = self.payload_recipients().clone();
56
57        self.build(
58            move |new_leader_elected| {
59                let cur_leader_id = Self::get_recipient_from_ballot(
60                    new_leader_elected
61                        .broadcast_bincode(clients, nondet!(/** TODO */))
62                        .values()
63                        .inspect(q!(|ballot| println!(
64                            "Client notified that leader was elected: {:?}",
65                            ballot
66                        )))
67                        .max(),
68                );
69
70                let payloads_at_proposer = {
71                    let client_tick = clients.tick();
72                    let payload_batch = payloads.batch(&client_tick, nondet!(/** see below */));
73
74                    let latest_leader = cur_leader_id.snapshot(
75                        &client_tick,
76                        nondet!(
77                            /// the risk here is that we send a batch of requests
78                            /// with a stale leader ID, but because the leader ID comes from the
79                            /// network there is no way to guarantee that it is up to date. This
80                            /// is documented non-determinism.
81                            nondet_commit
82                        ),
83                    );
84
85                    let (unsent_payloads_complete, unsent_payloads) =
86                        client_tick.cycle::<Stream<_, _, _, TotalOrder>>();
87
88                    let all_payloads = unsent_payloads.chain(payload_batch);
89
90                    unsent_payloads_complete.complete_next_tick(
91                        all_payloads.clone().continue_unless(latest_leader.clone()),
92                    );
93
94                    all_payloads.cross_singleton(latest_leader).all_ticks()
95                }
96                .map(q!(move |(payload, leader_id)| (leader_id, payload)))
97                .demux_bincode(&leaders)
98                .values();
99
100                let payloads_at_proposer = {
101                    payloads_at_proposer.assume_ordering(nondet!(
102                        /// documented non-determinism in interleaving of client payloads
103                        nondet_order
104                    ))
105                };
106
107                payloads_at_proposer
108            },
109            checkpoints,
110            nondet!(
111                /// non-deterministic leader changes may lead to sending to a stale leader, which will drop payloads
112                nondet_commit
113            ),
114            nondet_commit,
115        )
116    }
117}