1use std::collections::HashMap;
2
3use hydro_lang::live_collections::stream::NoOrder;
4use hydro_lang::location::{Atomic, Location, MemberId};
5use hydro_lang::prelude::*;
6use hydro_std::quorum::collect_quorum;
7use hydro_std::request_response::join_responses;
8use serde::{Deserialize, Serialize};
9
10use super::paxos::{
11 Acceptor, Ballot, LogValue, P2a, PaxosConfig, PaxosPayload, Proposer, acceptor_p2,
12 index_payloads, leader_election, recommit_after_leader_election,
13};
14use super::paxos_with_client::PaxosLike;
15
16#[derive(Serialize, Deserialize, Clone)]
17pub struct ProxyLeader {}
18
19#[derive(Clone, Copy)]
20pub struct CompartmentalizedPaxosConfig {
21 pub paxos_config: PaxosConfig,
22 pub num_proxy_leaders: usize,
23 pub acceptor_grid_rows: usize,
25 pub acceptor_grid_cols: usize,
27 pub num_replicas: usize,
28 pub acceptor_retry_timeout: u64,
30}
31
32pub struct CoreCompartmentalizedPaxos<'a> {
33 pub proposers: Cluster<'a, Proposer>,
34 pub proxy_leaders: Cluster<'a, ProxyLeader>,
35 pub acceptors: Cluster<'a, Acceptor>,
36 pub config: CompartmentalizedPaxosConfig,
37}
38
39impl<'a> PaxosLike<'a> for CoreCompartmentalizedPaxos<'a> {
40 type PaxosIn = Proposer;
41 type PaxosLog = Acceptor;
42 type PaxosOut = ProxyLeader;
43 type Ballot = Ballot;
44
45 fn payload_recipients(&self) -> &Cluster<'a, Self::PaxosIn> {
46 &self.proposers
47 }
48
49 fn log_stores(&self) -> &Cluster<'a, Self::PaxosLog> {
50 &self.acceptors
51 }
52
53 fn get_recipient_from_ballot<L: Location<'a>>(
54 ballot: Optional<Self::Ballot, L, Unbounded>,
55 ) -> Optional<MemberId<Self::PaxosIn>, L, Unbounded> {
56 ballot.map(q!(|ballot| ballot.proposer_id))
57 }
58
59 fn build<P: PaxosPayload>(
60 self,
61 with_ballot: impl FnOnce(
62 Stream<Ballot, Cluster<'a, Self::PaxosIn>, Unbounded>,
63 ) -> Stream<P, Cluster<'a, Self::PaxosIn>, Unbounded>,
64 a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
65 nondet_leader: NonDet,
66 nondet_commit: NonDet,
67 ) -> Stream<(usize, Option<P>), Cluster<'a, Self::PaxosOut>, Unbounded, NoOrder> {
68 compartmentalized_paxos_core(
69 &self.proposers,
70 &self.proxy_leaders,
71 &self.acceptors,
72 a_checkpoint,
73 with_ballot,
74 self.config,
75 nondet_leader,
76 nondet_commit,
77 )
78 .1
79 }
80}
81
82#[expect(
102 clippy::type_complexity,
103 clippy::too_many_arguments,
104 reason = "internal paxos code // TODO"
105)]
106pub fn compartmentalized_paxos_core<'a, P: PaxosPayload>(
107 proposers: &Cluster<'a, Proposer>,
108 proxy_leaders: &Cluster<'a, ProxyLeader>,
109 acceptors: &Cluster<'a, Acceptor>,
110 a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
111 c_to_proposers: impl FnOnce(
112 Stream<Ballot, Cluster<'a, Proposer>, Unbounded>,
113 ) -> Stream<P, Cluster<'a, Proposer>, Unbounded>,
114 config: CompartmentalizedPaxosConfig,
115 nondet_leader: NonDet,
116 nondet_commit_leader_change: NonDet,
117) -> (
118 Stream<Ballot, Cluster<'a, Proposer>, Unbounded>,
119 Stream<(usize, Option<P>), Cluster<'a, ProxyLeader>, Unbounded, NoOrder>,
120) {
121 proposers
122 .source_iter(q!(["Proposers say hello"]))
123 .for_each(q!(|s| println!("{}", s)));
124
125 proxy_leaders
126 .source_iter(q!(["Proxy leaders say hello"]))
127 .for_each(q!(|s| println!("{}", s)));
128
129 acceptors
130 .source_iter(q!(["Acceptors say hello"]))
131 .for_each(q!(|s| println!("{}", s)));
132
133 let proposer_tick = proposers.tick();
134 let proxy_leader_tick = proxy_leaders.tick();
135 let acceptor_tick = acceptors.tick();
136
137 let (sequencing_max_ballot_complete_cycle, sequencing_max_ballot_forward_reference) =
138 proposers.forward_ref::<Stream<Ballot, _, _, NoOrder>>();
139 let (a_log_complete_cycle, a_log_forward_reference) =
140 acceptor_tick.forward_ref::<Singleton<_, _, _>>();
141
142 let (p_ballot, p_is_leader, p_relevant_p1bs, a_max_ballot) = leader_election(
143 proposers,
144 acceptors,
145 &proposer_tick,
146 &acceptor_tick,
147 config.acceptor_grid_rows,
148 config.acceptor_grid_rows * config.acceptor_grid_cols,
149 config.paxos_config,
150 sequencing_max_ballot_forward_reference,
151 a_log_forward_reference,
152 nondet!(
153 nondet_leader
157 ),
158 nondet!(
159 ),
162 );
163
164 let was_not_leader = p_is_leader
165 .clone()
166 .map(q!(|is_leader| is_leader.then_some(())))
167 .into_optional()
168 .defer_tick()
169 .is_none();
170 let just_became_leader = p_is_leader.clone().and(was_not_leader);
171
172 let c_to_proposers = c_to_proposers(
173 p_ballot
174 .clone()
175 .filter_if(just_became_leader.clone())
176 .all_ticks(),
177 );
178
179 let (p_to_replicas, a_log, sequencing_max_ballots) = sequence_payload(
180 proposers,
181 proxy_leaders,
182 acceptors,
183 &proposer_tick,
184 &proxy_leader_tick,
185 &acceptor_tick,
186 c_to_proposers,
187 a_checkpoint,
188 p_ballot.clone(),
189 p_is_leader,
190 p_relevant_p1bs,
191 config,
192 a_max_ballot,
193 nondet!(
194 nondet_commit_leader_change
199 ),
200 );
201
202 a_log_complete_cycle.complete(a_log.snapshot_atomic(
203 &acceptor_tick,
204 nondet!(
205 ),
209 ));
210 sequencing_max_ballot_complete_cycle.complete(sequencing_max_ballots);
211
212 (
213 p_ballot.filter_if(just_became_leader).all_ticks(),
215 p_to_replicas,
216 )
217}
218
219#[expect(
220 clippy::type_complexity,
221 clippy::too_many_arguments,
222 reason = "internal paxos code // TODO"
223)]
224fn sequence_payload<'a, P: PaxosPayload>(
225 proposers: &Cluster<'a, Proposer>,
226 proxy_leaders: &Cluster<'a, ProxyLeader>,
227 acceptors: &Cluster<'a, Acceptor>,
228 proposer_tick: &Tick<Cluster<'a, Proposer>>,
229 proxy_leader_tick: &Tick<Cluster<'a, ProxyLeader>>,
230 acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
231 c_to_proposers: Stream<P, Cluster<'a, Proposer>, Unbounded>,
232 a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
233
234 p_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
235 p_is_leader: Singleton<bool, Tick<Cluster<'a, Proposer>>, Bounded>,
236
237 p_relevant_p1bs: Stream<
238 (Option<usize>, HashMap<usize, LogValue<P>>),
239 Tick<Cluster<'a, Proposer>>,
240 Bounded,
241 NoOrder,
242 >,
243 config: CompartmentalizedPaxosConfig,
244 a_max_ballot: Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
245 nondet_commit_leader_change: NonDet,
246) -> (
247 Stream<(usize, Option<P>), Cluster<'a, ProxyLeader>, Unbounded, NoOrder>,
248 Singleton<
249 (Option<usize>, HashMap<usize, LogValue<P>>),
250 Atomic<Cluster<'a, Acceptor>>,
251 Unbounded,
252 >,
253 Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder>,
254) {
255 let (p_log_to_recommit, p_max_slot) =
256 recommit_after_leader_election(p_relevant_p1bs, p_ballot.clone(), config.paxos_config.f);
257
258 let p_indexed_payloads = index_payloads(
259 p_max_slot,
260 c_to_proposers
261 .batch(
262 proposer_tick,
263 nondet!(
264 nondet_commit_leader_change
270 ),
271 )
272 .filter_if(p_is_leader.clone()),
273 );
274
275 let num_proxy_leaders = config.num_proxy_leaders;
276 let p_to_proxy_leaders_p2a = p_indexed_payloads
277 .cross_singleton(p_ballot.clone())
278 .map(q!(move |((slot, payload), ballot)| (
279 MemberId::<ProxyLeader>::from_raw_id((slot % num_proxy_leaders) as u32),
280 ((slot, ballot), Some(payload))
281 )))
282 .chain(p_log_to_recommit.map(q!(move |((slot, ballot), payload)| (
283 MemberId::<ProxyLeader>::from_raw_id((slot % num_proxy_leaders) as u32),
284 ((slot, ballot), payload)
285 ))))
286 .all_ticks()
287 .demux(proxy_leaders, TCP.fail_stop().bincode())
288 .values()
289 .atomic();
290
291 let num_acceptor_rows = config.acceptor_grid_rows;
293 let num_acceptor_cols = config.acceptor_grid_cols;
294 let pl_to_acceptors_p2a_thrifty = p_to_proxy_leaders_p2a
295 .clone()
296 .flat_map_unordered(q!(move |((slot, ballot), payload)| {
297 let row = slot % num_acceptor_rows;
298 (0..num_acceptor_cols).map(move |i| {
299 (
300 MemberId::<Acceptor>::from_raw_id((row * num_acceptor_cols + i) as u32),
301 P2a {
302 sender: MemberId::<ProxyLeader>::from_raw_id(
303 (slot % num_proxy_leaders) as u32,
304 ),
305 slot,
306 ballot: ballot.clone(),
307 value: payload.clone(),
308 },
309 )
310 })
311 }))
312 .end_atomic()
313 .demux(acceptors, TCP.fail_stop().bincode())
314 .values();
315
316 let (a_log, a_to_proxy_leaders_p2b) = acceptor_p2(
317 acceptor_tick,
318 a_max_ballot.clone(),
319 pl_to_acceptors_p2a_thrifty,
320 a_checkpoint,
321 proxy_leaders,
322 );
323
324 let (quorums, fails) = collect_quorum(
327 a_to_proxy_leaders_p2b,
328 config.acceptor_grid_cols,
329 config.acceptor_grid_cols,
330 );
331
332 let pl_to_replicas = join_responses(
333 quorums.map(q!(|k| (k, ()))),
334 p_to_proxy_leaders_p2a.batch_atomic(
335 proxy_leader_tick,
336 nondet!(
337 ),
341 ),
342 );
343
344 let pl_failed_p2b_to_proposer = fails
345 .map(q!(|(_, ballot)| (ballot.proposer_id.clone(), ballot)))
346 .inspect(q!(|(_, ballot)| println!("Failed P2b: {:?}", ballot)))
347 .demux(proposers, TCP.fail_stop().bincode())
348 .values();
349
350 (
351 pl_to_replicas.map(q!(|((slot, _ballot), (value, _))| (slot, value))),
352 a_log,
353 pl_failed_p2b_to_proposer,
354 )
355}