1use std::collections::HashMap;
2
3use hydro_lang::*;
4use hydro_std::quorum::collect_quorum;
5use hydro_std::request_response::join_responses;
6use serde::{Deserialize, Serialize};
7
8use super::paxos::{
9 Acceptor, Ballot, LogValue, P2a, PaxosConfig, PaxosPayload, Proposer, acceptor_p2,
10 index_payloads, leader_election, recommit_after_leader_election,
11};
12use super::paxos_with_client::PaxosLike;
13
14#[derive(Serialize, Deserialize, Clone)]
15pub struct ProxyLeader {}
16
17#[derive(Clone, Copy)]
18pub struct CompartmentalizedPaxosConfig {
19 pub paxos_config: PaxosConfig,
20 pub num_proxy_leaders: usize,
21 pub acceptor_grid_rows: usize,
23 pub acceptor_grid_cols: usize,
25 pub num_replicas: usize,
26 pub acceptor_retry_timeout: u64,
28}
29
30pub struct CoreCompartmentalizedPaxos<'a> {
31 pub proposers: Cluster<'a, Proposer>,
32 pub proxy_leaders: Cluster<'a, ProxyLeader>,
33 pub acceptors: Cluster<'a, Acceptor>,
34 pub config: CompartmentalizedPaxosConfig,
35}
36
37impl<'a> PaxosLike<'a> for CoreCompartmentalizedPaxos<'a> {
38 type PaxosIn = Proposer;
39 type PaxosLog = Acceptor;
40 type PaxosOut = ProxyLeader;
41 type Ballot = Ballot;
42
43 fn payload_recipients(&self) -> &Cluster<'a, Self::PaxosIn> {
44 &self.proposers
45 }
46
47 fn log_stores(&self) -> &Cluster<'a, Self::PaxosLog> {
48 &self.acceptors
49 }
50
51 fn get_recipient_from_ballot<L: Location<'a>>(
52 ballot: Optional<Self::Ballot, L, Unbounded>,
53 ) -> Optional<MemberId<Self::PaxosIn>, L, Unbounded> {
54 ballot.map(q!(|ballot| ballot.proposer_id))
55 }
56
57 fn build<P: PaxosPayload>(
58 self,
59 with_ballot: impl FnOnce(
60 Stream<Ballot, Cluster<'a, Self::PaxosIn>, Unbounded>,
61 ) -> Stream<P, Cluster<'a, Self::PaxosIn>, Unbounded>,
62 a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
63 nondet_leader: NonDet,
64 nondet_commit: NonDet,
65 ) -> Stream<(usize, Option<P>), Cluster<'a, Self::PaxosOut>, Unbounded, NoOrder> {
66 compartmentalized_paxos_core(
67 &self.proposers,
68 &self.proxy_leaders,
69 &self.acceptors,
70 a_checkpoint,
71 with_ballot,
72 self.config,
73 nondet_leader,
74 nondet_commit,
75 )
76 .1
77 }
78}
79
80#[expect(
100 clippy::type_complexity,
101 clippy::too_many_arguments,
102 reason = "internal paxos code // TODO"
103)]
104pub fn compartmentalized_paxos_core<'a, P: PaxosPayload>(
105 proposers: &Cluster<'a, Proposer>,
106 proxy_leaders: &Cluster<'a, ProxyLeader>,
107 acceptors: &Cluster<'a, Acceptor>,
108 a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
109 c_to_proposers: impl FnOnce(
110 Stream<Ballot, Cluster<'a, Proposer>, Unbounded>,
111 ) -> Stream<P, Cluster<'a, Proposer>, Unbounded>,
112 config: CompartmentalizedPaxosConfig,
113 nondet_leader: NonDet,
114 nondet_commit_leader_change: NonDet,
115) -> (
116 Stream<Ballot, Cluster<'a, Proposer>, Unbounded>,
117 Stream<(usize, Option<P>), Cluster<'a, ProxyLeader>, Unbounded, NoOrder>,
118) {
119 proposers
120 .source_iter(q!(["Proposers say hello"]))
121 .for_each(q!(|s| println!("{}", s)));
122
123 proxy_leaders
124 .source_iter(q!(["Proxy leaders say hello"]))
125 .for_each(q!(|s| println!("{}", s)));
126
127 acceptors
128 .source_iter(q!(["Acceptors say hello"]))
129 .for_each(q!(|s| println!("{}", s)));
130
131 let proposer_tick = proposers.tick();
132 let proxy_leader_tick = proxy_leaders.tick();
133 let acceptor_tick = acceptors.tick();
134
135 let (sequencing_max_ballot_complete_cycle, sequencing_max_ballot_forward_reference) =
136 proposers.forward_ref::<Stream<Ballot, _, _, NoOrder>>();
137 let (a_log_complete_cycle, a_log_forward_reference) =
138 acceptor_tick.forward_ref::<Singleton<_, _, _>>();
139
140 let (p_ballot, p_is_leader, p_relevant_p1bs, a_max_ballot) = leader_election(
141 proposers,
142 acceptors,
143 &proposer_tick,
144 &acceptor_tick,
145 config.acceptor_grid_rows,
146 config.acceptor_grid_rows * config.acceptor_grid_cols,
147 config.paxos_config,
148 sequencing_max_ballot_forward_reference,
149 a_log_forward_reference,
150 nondet!(
151 nondet_leader
155 ),
156 nondet!(
157 ),
160 );
161
162 let just_became_leader = p_is_leader
163 .clone()
164 .continue_unless(p_is_leader.clone().defer_tick());
165
166 let c_to_proposers = c_to_proposers(
167 just_became_leader
168 .clone()
169 .then(p_ballot.clone())
170 .all_ticks(),
171 );
172
173 let (p_to_replicas, a_log, sequencing_max_ballots) = sequence_payload(
174 proposers,
175 proxy_leaders,
176 acceptors,
177 &proposer_tick,
178 &proxy_leader_tick,
179 &acceptor_tick,
180 c_to_proposers,
181 a_checkpoint,
182 p_ballot.clone(),
183 p_is_leader,
184 p_relevant_p1bs,
185 config,
186 a_max_ballot,
187 nondet!(
188 nondet_commit_leader_change
193 ),
194 );
195
196 a_log_complete_cycle.complete(a_log.snapshot(nondet!(
197 )));
201 sequencing_max_ballot_complete_cycle.complete(sequencing_max_ballots);
202
203 (
204 just_became_leader.then(p_ballot).all_ticks(),
206 p_to_replicas,
207 )
208}
209
210#[expect(
211 clippy::type_complexity,
212 clippy::too_many_arguments,
213 reason = "internal paxos code // TODO"
214)]
215fn sequence_payload<'a, P: PaxosPayload>(
216 proposers: &Cluster<'a, Proposer>,
217 proxy_leaders: &Cluster<'a, ProxyLeader>,
218 acceptors: &Cluster<'a, Acceptor>,
219 proposer_tick: &Tick<Cluster<'a, Proposer>>,
220 proxy_leader_tick: &Tick<Cluster<'a, ProxyLeader>>,
221 acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
222 c_to_proposers: Stream<P, Cluster<'a, Proposer>, Unbounded>,
223 a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
224
225 p_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
226 p_is_leader: Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
227
228 p_relevant_p1bs: Stream<
229 (Option<usize>, HashMap<usize, LogValue<P>>),
230 Tick<Cluster<'a, Proposer>>,
231 Bounded,
232 NoOrder,
233 >,
234 config: CompartmentalizedPaxosConfig,
235 a_max_ballot: Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
236 nondet_commit_leader_change: NonDet,
237) -> (
238 Stream<(usize, Option<P>), Cluster<'a, ProxyLeader>, Unbounded, NoOrder>,
239 Singleton<
240 (Option<usize>, HashMap<usize, LogValue<P>>),
241 Atomic<Cluster<'a, Acceptor>>,
242 Unbounded,
243 >,
244 Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder>,
245) {
246 let (p_log_to_recommit, p_max_slot) =
247 recommit_after_leader_election(p_relevant_p1bs, p_ballot.clone(), config.paxos_config.f);
248
249 let p_indexed_payloads = index_payloads(
250 proposer_tick,
251 p_max_slot,
252 c_to_proposers
253 .batch(
254 proposer_tick,
255 nondet!(
256 nondet_commit_leader_change
262 ),
263 )
264 .continue_if(p_is_leader.clone()),
265 );
266
267 let num_proxy_leaders = config.num_proxy_leaders;
268 let p_to_proxy_leaders_p2a = p_indexed_payloads
269 .cross_singleton(p_ballot.clone())
270 .map(q!(move |((slot, payload), ballot)| (
271 MemberId::<ProxyLeader>::from_raw((slot % num_proxy_leaders) as u32),
272 ((slot, ballot), Some(payload))
273 )))
274 .chain(p_log_to_recommit.map(q!(move |((slot, ballot), payload)| (
275 MemberId::<ProxyLeader>::from_raw((slot % num_proxy_leaders) as u32),
276 ((slot, ballot), payload)
277 ))))
278 .all_ticks()
279 .demux_bincode(proxy_leaders)
280 .values();
281
282 let num_acceptor_rows = config.acceptor_grid_rows;
284 let num_acceptor_cols = config.acceptor_grid_cols;
285 let pl_to_acceptors_p2a_thrifty = p_to_proxy_leaders_p2a
286 .clone()
287 .flat_map_unordered(q!(move |((slot, ballot), payload)| {
288 let row = slot % num_acceptor_rows;
289 let mut p2as = Vec::new();
290 for i in 0..num_acceptor_cols {
291 p2as.push((
292 MemberId::<Acceptor>::from_raw((row * num_acceptor_cols + i) as u32),
293 P2a {
294 sender: MemberId::<ProxyLeader>::from_raw(
295 (slot % num_proxy_leaders) as u32,
296 ),
297 slot,
298 ballot,
299 value: payload.clone(),
300 },
301 ));
302 }
303 p2as
304 }))
305 .demux_bincode(acceptors)
306 .values();
307
308 let (a_log, a_to_proxy_leaders_p2b) = acceptor_p2(
309 acceptor_tick,
310 a_max_ballot.clone(),
311 pl_to_acceptors_p2a_thrifty,
312 a_checkpoint,
313 proxy_leaders,
314 );
315
316 let (quorums, fails) = collect_quorum(
319 a_to_proxy_leaders_p2b.atomic(proxy_leader_tick),
320 config.acceptor_grid_cols,
321 config.acceptor_grid_cols,
322 );
323
324 let pl_to_replicas = join_responses(
325 proxy_leader_tick,
326 quorums.map(q!(|k| (k, ()))),
327 p_to_proxy_leaders_p2a.batch(proxy_leader_tick, nondet!()),
328 );
329
330 let pl_failed_p2b_to_proposer = fails
331 .map(q!(|(_, ballot)| (ballot.proposer_id, ballot)))
332 .inspect(q!(|(_, ballot)| println!("Failed P2b: {:?}", ballot)))
333 .end_atomic()
334 .demux_bincode(proposers)
335 .values();
336
337 (
338 pl_to_replicas
339 .map(q!(|((slot, _ballot), (value, _))| (slot, value)))
340 .end_atomic(),
341 a_log,
342 pl_failed_p2b_to_proposer,
343 )
344}