hydro_test/cluster/
paxos_with_client.rs1use std::fmt::Debug;
2
3use hydro_lang::live_collections::stream::{NoOrder, TotalOrder};
4use hydro_lang::location::{Location, MemberId};
5use hydro_lang::prelude::*;
6use serde::Serialize;
7use serde::de::DeserializeOwned;
8
9use super::paxos::PaxosPayload;
10
11pub trait PaxosLike<'a>: Sized {
12 type PaxosIn: 'a;
14
15 type PaxosLog: 'a;
17
18 type PaxosOut: 'a;
20 type Ballot: Clone + Ord + Debug + Serialize + DeserializeOwned;
21
22 fn payload_recipients(&self) -> &Cluster<'a, Self::PaxosIn>;
23 fn log_stores(&self) -> &Cluster<'a, Self::PaxosLog>;
24
25 fn get_recipient_from_ballot<L: Location<'a>>(
26 ballot: Optional<Self::Ballot, L, Unbounded>,
27 ) -> Optional<MemberId<Self::PaxosIn>, L, Unbounded>;
28
29 #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
33 fn build<P: PaxosPayload>(
34 self,
35 payload_generator: impl FnOnce(
36 Stream<Self::Ballot, Cluster<'a, Self::PaxosIn>, Unbounded>,
37 ) -> Stream<P, Cluster<'a, Self::PaxosIn>, Unbounded>,
38 checkpoints: Optional<usize, Cluster<'a, Self::PaxosLog>, Unbounded>,
39 nondet_leader: NonDet,
40 nondet_commit: NonDet,
41 ) -> Stream<(usize, Option<P>), Cluster<'a, Self::PaxosOut>, Unbounded, NoOrder>;
42
43 #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
48 fn with_client<C: 'a, P: PaxosPayload>(
49 self,
50 clients: &Cluster<'a, C>,
51 payloads: Stream<P, Cluster<'a, C>, Unbounded>,
52 checkpoints: Optional<usize, Cluster<'a, Self::PaxosLog>, Unbounded>,
53 nondet_commit: NonDet,
54 nondet_order: NonDet,
55 ) -> Stream<(usize, Option<P>), Cluster<'a, Self::PaxosOut>, Unbounded, NoOrder> {
56 let leaders = self.payload_recipients().clone();
57
58 self.build(
59 move |new_leader_elected| {
60 let cur_leader_id = Self::get_recipient_from_ballot(
61 new_leader_elected
62 .broadcast(clients, TCP.fail_stop().bincode(), nondet!())
63 .values()
64 .inspect(q!(|ballot| println!(
65 "Client notified that leader was elected: {:?}",
66 ballot
67 )))
68 .max(),
69 );
70
71 let payloads_at_proposer = sliced! {
72 let mut unsent_payloads = use::state_null::<Stream<_, _, _, TotalOrder>>();
73
74 let payload_batch = use(payloads, nondet!());
75 let latest_leader = use(cur_leader_id,
76 nondet!(
77 nondet_commit
82 )
83 );
84
85 let all_payloads = unsent_payloads.chain(payload_batch);
86
87 unsent_payloads = all_payloads.clone().filter_if(latest_leader.clone().is_none());
88 all_payloads.cross_singleton(latest_leader)
89 }
90 .map(q!(move |(payload, leader_id)| (leader_id, payload)))
91 .demux(&leaders, TCP.fail_stop().bincode())
92 .values();
93
94 let payloads_at_proposer = {
95 payloads_at_proposer.assume_ordering(nondet!(
96 nondet_order
98 ))
99 };
100
101 payloads_at_proposer
102 },
103 checkpoints,
104 nondet!(
105 nondet_commit
107 ),
108 nondet_commit,
109 )
110 }
111}