1use std::collections::HashMap;
2
3use hydro_lang::*;
4use hydro_std::quorum::collect_quorum;
5use hydro_std::request_response::join_responses;
6use serde::{Deserialize, Serialize};
7
8use super::paxos::{
9 Acceptor, Ballot, LogValue, P2a, PaxosConfig, PaxosPayload, Proposer, acceptor_p2,
10 index_payloads, leader_election, recommit_after_leader_election,
11};
12use super::paxos_with_client::PaxosLike;
13
14#[derive(Serialize, Deserialize, Clone)]
15pub struct ProxyLeader {}
16
17#[derive(Clone, Copy)]
18pub struct CompartmentalizedPaxosConfig {
19 pub paxos_config: PaxosConfig,
20 pub num_proxy_leaders: usize,
21 pub acceptor_grid_rows: usize,
23 pub acceptor_grid_cols: usize,
25 pub num_replicas: usize,
26 pub acceptor_retry_timeout: u64,
28}
29
30pub struct CoreCompartmentalizedPaxos<'a> {
31 pub proposers: Cluster<'a, Proposer>,
32 pub proxy_leaders: Cluster<'a, ProxyLeader>,
33 pub acceptors: Cluster<'a, Acceptor>,
34 pub config: CompartmentalizedPaxosConfig,
35}
36
37impl<'a> PaxosLike<'a> for CoreCompartmentalizedPaxos<'a> {
38 type PaxosIn = Proposer;
39 type PaxosLog = Acceptor;
40 type PaxosOut = ProxyLeader;
41 type Ballot = Ballot;
42
43 fn payload_recipients(&self) -> &Cluster<'a, Self::PaxosIn> {
44 &self.proposers
45 }
46
47 fn log_stores(&self) -> &Cluster<'a, Self::PaxosLog> {
48 &self.acceptors
49 }
50
51 fn get_recipient_from_ballot<L: Location<'a>>(
52 ballot: Optional<Self::Ballot, L, Unbounded>,
53 ) -> Optional<ClusterId<Self::PaxosIn>, L, Unbounded> {
54 ballot.map(q!(|ballot| ballot.proposer_id))
55 }
56
57 unsafe fn build<P: PaxosPayload>(
58 self,
59 with_ballot: impl FnOnce(
60 Stream<Ballot, Cluster<'a, Self::PaxosIn>, Unbounded>,
61 ) -> Stream<P, Cluster<'a, Self::PaxosIn>, Unbounded>,
62 a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
63 ) -> Stream<(usize, Option<P>), Cluster<'a, Self::PaxosOut>, Unbounded, NoOrder> {
64 unsafe {
65 compartmentalized_paxos_core(
66 &self.proposers,
67 &self.proxy_leaders,
68 &self.acceptors,
69 a_checkpoint,
70 with_ballot,
71 self.config,
72 )
73 .1
74 }
75 }
76}
77
78#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
98pub unsafe fn compartmentalized_paxos_core<'a, P: PaxosPayload>(
99 proposers: &Cluster<'a, Proposer>,
100 proxy_leaders: &Cluster<'a, ProxyLeader>,
101 acceptors: &Cluster<'a, Acceptor>,
102 a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
103 c_to_proposers: impl FnOnce(
104 Stream<Ballot, Cluster<'a, Proposer>, Unbounded>,
105 ) -> Stream<P, Cluster<'a, Proposer>, Unbounded>,
106 config: CompartmentalizedPaxosConfig,
107) -> (
108 Stream<Ballot, Cluster<'a, Proposer>, Unbounded>,
109 Stream<(usize, Option<P>), Cluster<'a, ProxyLeader>, Unbounded, NoOrder>,
110) {
111 proposers
112 .source_iter(q!(["Proposers say hello"]))
113 .for_each(q!(|s| println!("{}", s)));
114
115 proxy_leaders
116 .source_iter(q!(["Proxy leaders say hello"]))
117 .for_each(q!(|s| println!("{}", s)));
118
119 acceptors
120 .source_iter(q!(["Acceptors say hello"]))
121 .for_each(q!(|s| println!("{}", s)));
122
123 let proposer_tick = proposers.tick();
124 let proxy_leader_tick = proxy_leaders.tick();
125 let acceptor_tick = acceptors.tick();
126
127 let (sequencing_max_ballot_complete_cycle, sequencing_max_ballot_forward_reference) =
128 proposers.forward_ref::<Stream<Ballot, _, _, NoOrder>>();
129 let (a_log_complete_cycle, a_log_forward_reference) =
130 acceptor_tick.forward_ref::<Singleton<_, _, _>>();
131
132 let (p_ballot, p_is_leader, p_relevant_p1bs, a_max_ballot) = unsafe {
133 leader_election(
139 proposers,
140 acceptors,
141 &proposer_tick,
142 &acceptor_tick,
143 config.acceptor_grid_rows,
144 config.acceptor_grid_rows * config.acceptor_grid_cols,
145 config.paxos_config,
146 sequencing_max_ballot_forward_reference,
147 a_log_forward_reference,
148 )
149 };
150
151 let just_became_leader = p_is_leader
152 .clone()
153 .continue_unless(p_is_leader.clone().defer_tick());
154
155 let c_to_proposers = c_to_proposers(
156 just_became_leader
157 .clone()
158 .then(p_ballot.clone())
159 .all_ticks(),
160 );
161
162 let (p_to_replicas, a_log, sequencing_max_ballots) = unsafe {
163 sequence_payload(
168 proposers,
169 proxy_leaders,
170 acceptors,
171 &proposer_tick,
172 &proxy_leader_tick,
173 &acceptor_tick,
174 c_to_proposers,
175 a_checkpoint,
176 p_ballot.clone(),
177 p_is_leader,
178 p_relevant_p1bs,
179 config,
180 a_max_ballot,
181 )
182 };
183
184 a_log_complete_cycle.complete(unsafe {
185 a_log.latest_tick()
189 });
190 sequencing_max_ballot_complete_cycle.complete(sequencing_max_ballots);
191
192 (
193 just_became_leader.then(p_ballot).all_ticks(),
195 p_to_replicas,
196 )
197}
198
199#[expect(
200 clippy::type_complexity,
201 clippy::too_many_arguments,
202 reason = "internal paxos code // TODO"
203)]
204unsafe fn sequence_payload<'a, P: PaxosPayload>(
205 proposers: &Cluster<'a, Proposer>,
206 proxy_leaders: &Cluster<'a, ProxyLeader>,
207 acceptors: &Cluster<'a, Acceptor>,
208 proposer_tick: &Tick<Cluster<'a, Proposer>>,
209 proxy_leader_tick: &Tick<Cluster<'a, ProxyLeader>>,
210 acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
211 c_to_proposers: Stream<P, Cluster<'a, Proposer>, Unbounded>,
212 a_checkpoint: Optional<usize, Cluster<'a, Acceptor>, Unbounded>,
213
214 p_ballot: Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
215 p_is_leader: Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
216
217 p_relevant_p1bs: Stream<
218 (Option<usize>, HashMap<usize, LogValue<P>>),
219 Tick<Cluster<'a, Proposer>>,
220 Bounded,
221 NoOrder,
222 >,
223 config: CompartmentalizedPaxosConfig,
224 a_max_ballot: Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
225) -> (
226 Stream<(usize, Option<P>), Cluster<'a, ProxyLeader>, Unbounded, NoOrder>,
227 Singleton<
228 (Option<usize>, HashMap<usize, LogValue<P>>),
229 Atomic<Cluster<'a, Acceptor>>,
230 Unbounded,
231 >,
232 Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder>,
233) {
234 let (p_log_to_recommit, p_max_slot) =
235 recommit_after_leader_election(p_relevant_p1bs, p_ballot.clone(), config.paxos_config.f);
236
237 let p_indexed_payloads = index_payloads(proposer_tick, p_max_slot, unsafe {
238 c_to_proposers
244 .tick_batch(proposer_tick)
245 .continue_if(p_is_leader.clone())
246 });
247
248 let num_proxy_leaders = config.num_proxy_leaders;
249 let p_to_proxy_leaders_p2a = p_indexed_payloads
250 .cross_singleton(p_ballot.clone())
251 .map(q!(move |((slot, payload), ballot)| (
252 ClusterId::<ProxyLeader>::from_raw((slot % num_proxy_leaders) as u32),
253 ((slot, ballot), Some(payload))
254 )))
255 .chain(p_log_to_recommit.map(q!(move |((slot, ballot), payload)| (
256 ClusterId::<ProxyLeader>::from_raw((slot % num_proxy_leaders) as u32),
257 ((slot, ballot), payload)
258 ))))
259 .all_ticks()
260 .send_bincode_anonymous(proxy_leaders);
261
262 let num_acceptor_rows = config.acceptor_grid_rows;
264 let num_acceptor_cols = config.acceptor_grid_cols;
265 let pl_to_acceptors_p2a_thrifty = p_to_proxy_leaders_p2a
266 .clone()
267 .flat_map_unordered(q!(move |((slot, ballot), payload)| {
268 let row = slot % num_acceptor_rows;
269 let mut p2as = Vec::new();
270 for i in 0..num_acceptor_cols {
271 p2as.push((
272 ClusterId::<Acceptor>::from_raw((row * num_acceptor_cols + i) as u32),
273 P2a {
274 sender: ClusterId::<ProxyLeader>::from_raw(
275 (slot % num_proxy_leaders) as u32,
276 ),
277 slot,
278 ballot,
279 value: payload.clone(),
280 },
281 ));
282 }
283 p2as
284 }))
285 .send_bincode_anonymous(acceptors);
286
287 let (a_log, a_to_proxy_leaders_p2b) = acceptor_p2(
288 acceptor_tick,
289 a_max_ballot.clone(),
290 pl_to_acceptors_p2a_thrifty,
291 a_checkpoint,
292 proxy_leaders,
293 );
294
295 let (quorums, fails) = collect_quorum(
298 a_to_proxy_leaders_p2b.atomic(proxy_leader_tick),
299 config.acceptor_grid_cols,
300 config.acceptor_grid_cols,
301 );
302
303 let pl_to_replicas = join_responses(proxy_leader_tick, quorums.map(q!(|k| (k, ()))), unsafe {
304 p_to_proxy_leaders_p2a.tick_batch(proxy_leader_tick)
305 });
306
307 let pl_failed_p2b_to_proposer = fails
308 .map(q!(|(_, ballot)| (ballot.proposer_id, ballot)))
309 .inspect(q!(|(_, ballot)| println!("Failed P2b: {:?}", ballot)))
310 .send_bincode_anonymous(proposers);
311
312 (
313 pl_to_replicas
314 .map(q!(|((slot, _ballot), (value, _))| (slot, value)))
315 .end_atomic(),
316 a_log,
317 pl_failed_p2b_to_proposer,
318 )
319}