hydro_test/cluster/
paxos_with_client.rs1use std::fmt::Debug;
2
3use hydro_lang::location::MemberId;
4use hydro_lang::*;
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7
8use super::paxos::PaxosPayload;
9
10pub trait PaxosLike<'a>: Sized {
11 type PaxosIn: 'a;
13
14 type PaxosLog: 'a;
16
17 type PaxosOut: 'a;
19 type Ballot: Clone + Ord + Debug + Serialize + DeserializeOwned;
20
21 fn payload_recipients(&self) -> &Cluster<'a, Self::PaxosIn>;
22 fn log_stores(&self) -> &Cluster<'a, Self::PaxosLog>;
23
24 fn get_recipient_from_ballot<L: Location<'a>>(
25 ballot: Optional<Self::Ballot, L, Unbounded>,
26 ) -> Optional<MemberId<Self::PaxosIn>, L, Unbounded>;
27
28 #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
32 fn build<P: PaxosPayload>(
33 self,
34 payload_generator: impl FnOnce(
35 Stream<Self::Ballot, Cluster<'a, Self::PaxosIn>, Unbounded>,
36 ) -> Stream<P, Cluster<'a, Self::PaxosIn>, Unbounded>,
37 checkpoints: Optional<usize, Cluster<'a, Self::PaxosLog>, Unbounded>,
38 nondet_leader: NonDet,
39 nondet_commit: NonDet,
40 ) -> Stream<(usize, Option<P>), Cluster<'a, Self::PaxosOut>, Unbounded, NoOrder>;
41
42 #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
47 fn with_client<C: 'a, P: PaxosPayload>(
48 self,
49 clients: &Cluster<'a, C>,
50 payloads: Stream<P, Cluster<'a, C>, Unbounded>,
51 checkpoints: Optional<usize, Cluster<'a, Self::PaxosLog>, Unbounded>,
52 nondet_commit: NonDet,
53 nondet_order: NonDet,
54 ) -> Stream<(usize, Option<P>), Cluster<'a, Self::PaxosOut>, Unbounded, NoOrder> {
55 let leaders = self.payload_recipients().clone();
56
57 self.build(
58 move |new_leader_elected| {
59 let cur_leader_id = Self::get_recipient_from_ballot(
60 new_leader_elected
61 .broadcast_bincode(clients, nondet!())
62 .values()
63 .inspect(q!(|ballot| println!(
64 "Client notified that leader was elected: {:?}",
65 ballot
66 )))
67 .max(),
68 );
69
70 let payloads_at_proposer = {
71 let client_tick = clients.tick();
72 let payload_batch = payloads.batch(&client_tick, nondet!());
73
74 let latest_leader = cur_leader_id.snapshot(
75 &client_tick,
76 nondet!(
77 nondet_commit
82 ),
83 );
84
85 let (unsent_payloads_complete, unsent_payloads) =
86 client_tick.cycle::<Stream<_, _, _, TotalOrder>>();
87
88 let all_payloads = unsent_payloads.chain(payload_batch);
89
90 unsent_payloads_complete.complete_next_tick(
91 all_payloads.clone().continue_unless(latest_leader.clone()),
92 );
93
94 all_payloads.cross_singleton(latest_leader).all_ticks()
95 }
96 .map(q!(move |(payload, leader_id)| (leader_id, payload)))
97 .demux_bincode(&leaders)
98 .values();
99
100 let payloads_at_proposer = {
101 payloads_at_proposer.assume_ordering(nondet!(
102 nondet_order
104 ))
105 };
106
107 payloads_at_proposer
108 },
109 checkpoints,
110 nondet!(
111 nondet_commit
113 ),
114 nondet_commit,
115 )
116 }
117}