Skip to main content

hydro_test/cluster/
paxos_with_client.rs

1use std::fmt::Debug;
2
3use hydro_lang::live_collections::stream::{NoOrder, TotalOrder};
4use hydro_lang::location::{Location, MemberId};
5use hydro_lang::prelude::*;
6use serde::Serialize;
7use serde::de::DeserializeOwned;
8
9use super::paxos::PaxosPayload;
10
11pub trait PaxosLike<'a>: Sized {
12    /// The nodes that receive inputs in Paxos. Usually the proposers.
13    type PaxosIn: 'a;
14
15    /// The nodes that store logs in Paxos. Usually the acceptors.
16    type PaxosLog: 'a;
17
18    /// The nodes that output the results of Paxos. Proposers in Paxos, Proxy leaders in Compartmentalized Paxos.
19    type PaxosOut: 'a;
20    type Ballot: Clone + Ord + Debug + Serialize + DeserializeOwned;
21
22    fn payload_recipients(&self) -> &Cluster<'a, Self::PaxosIn>;
23    fn log_stores(&self) -> &Cluster<'a, Self::PaxosLog>;
24
25    fn get_recipient_from_ballot<L: Location<'a>>(
26        ballot: Optional<Self::Ballot, L, Unbounded>,
27    ) -> Optional<MemberId<Self::PaxosIn>, L, Unbounded>;
28
29    /// # Non-Determinism
30    /// During leader-reelection, the latest known leader may be stale, which may
31    /// result in non-deterministic dropping of payloads.
32    #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
33    fn build<P: PaxosPayload>(
34        self,
35        payload_generator: impl FnOnce(
36            Stream<Self::Ballot, Cluster<'a, Self::PaxosIn>, Unbounded>,
37        ) -> Stream<P, Cluster<'a, Self::PaxosIn>, Unbounded>,
38        checkpoints: Optional<usize, Cluster<'a, Self::PaxosLog>, Unbounded>,
39        nondet_leader: NonDet,
40        nondet_commit: NonDet,
41    ) -> Stream<(usize, Option<P>), Cluster<'a, Self::PaxosOut>, Unbounded, NoOrder>;
42
43    /// # Non-Determinism
44    /// During leader-reelection, the latest known leader may be stale, which may
45    /// result in non-deterministic dropping of payloads. Also, payloads across
46    /// clients will be arbitrarily interleaved as they arrive at the leader.
47    #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
48    fn with_client<C: 'a, P: PaxosPayload>(
49        self,
50        clients: &Cluster<'a, C>,
51        payloads: Stream<P, Cluster<'a, C>, Unbounded>,
52        checkpoints: Optional<usize, Cluster<'a, Self::PaxosLog>, Unbounded>,
53        nondet_commit: NonDet,
54        nondet_order: NonDet,
55    ) -> Stream<(usize, Option<P>), Cluster<'a, Self::PaxosOut>, Unbounded, NoOrder> {
56        let leaders = self.payload_recipients().clone();
57
58        self.build(
59            move |new_leader_elected| {
60                let cur_leader_id = Self::get_recipient_from_ballot(
61                    new_leader_elected
62                        .broadcast(clients, TCP.fail_stop().bincode(), nondet!(/** TODO */))
63                        .values()
64                        .inspect(q!(|ballot| println!(
65                            "Client notified that leader was elected: {:?}",
66                            ballot
67                        )))
68                        .max(),
69                );
70
71                let payloads_at_proposer = sliced! {
72                    let mut unsent_payloads = use::state_null::<Stream<_, _, _, TotalOrder>>();
73
74                    let payload_batch = use(payloads, nondet!(/** see below */));
75                    let latest_leader = use(cur_leader_id,
76                        nondet!(
77                            /// the risk here is that we send a batch of requests
78                            /// with a stale leader ID, but because the leader ID comes from the
79                            /// network there is no way to guarantee that it is up to date. This
80                            /// is documented non-determinism.
81                            nondet_commit
82                        )
83                    );
84
85                    let all_payloads = unsent_payloads.chain(payload_batch);
86
87                    unsent_payloads = all_payloads.clone().filter_if(latest_leader.clone().is_none());
88                    all_payloads.cross_singleton(latest_leader)
89                }
90                .map(q!(move |(payload, leader_id)| (leader_id, payload)))
91                .demux(&leaders, TCP.fail_stop().bincode())
92                .values();
93
94                let payloads_at_proposer = {
95                    payloads_at_proposer.assume_ordering(nondet!(
96                        /// documented non-determinism in interleaving of client payloads
97                        nondet_order
98                    ))
99                };
100
101                payloads_at_proposer
102            },
103            checkpoints,
104            nondet!(
105                /// non-deterministic leader changes may lead to sending to a stale leader, which will drop payloads
106                nondet_commit
107            ),
108            nondet_commit,
109        )
110    }
111}