1use hydro_lang::*;
2use hydro_std::bench_client::{bench_client, print_bench_results};
3use hydro_std::quorum::collect_quorum;
4
5use super::kv_replica::{KvPayload, Replica, kv_replica};
6use super::paxos_with_client::PaxosLike;
7
8pub struct Client;
9pub struct Aggregator;
10
11#[expect(clippy::too_many_arguments, reason = "internal paxos code // TODO")]
12pub fn paxos_bench<'a>(
13 num_clients_per_node: usize,
14 checkpoint_frequency: usize, f: usize, num_replicas: usize,
17 paxos: impl PaxosLike<'a>,
18 clients: &Cluster<'a, Client>,
19 client_aggregator: &Process<'a, Aggregator>,
20 replicas: &Cluster<'a, Replica>,
21) {
22 let paxos_processor = |c_to_proposers: Stream<(u32, u32), Cluster<'a, Client>, Unbounded>| {
23 let payloads = c_to_proposers.map(q!(move |(key, value)| KvPayload {
24 key,
25 value: (CLUSTER_SELF_ID, value)
27 }));
28
29 let acceptors = paxos.log_stores().clone();
30 let (acceptor_checkpoint_complete, acceptor_checkpoint) =
31 acceptors.forward_ref::<Optional<_, _, _>>();
32
33 let sequenced_payloads = paxos.with_client(
34 clients,
35 payloads,
36 acceptor_checkpoint,
37 nondet!(),
39 nondet!(
40 ),
43 );
44
45 let sequenced_to_replicas = sequenced_payloads
46 .broadcast_bincode(replicas, nondet!())
47 .values();
48
49 let (replica_checkpoint, processed_payloads) =
51 kv_replica(replicas, sequenced_to_replicas, checkpoint_frequency);
52
53 let checkpoint_tick = acceptors.tick();
55 let a_checkpoint = {
56 let a_checkpoint_largest_seqs = replica_checkpoint
58 .broadcast_bincode(&acceptors, nondet!())
59 .entries()
60 .into_keyed()
61 .reduce_commutative(q!(|curr_seq, seq| {
62 if seq > *curr_seq {
63 *curr_seq = seq;
64 }
65 }))
66 .snapshot(
67 &checkpoint_tick,
68 nondet!(
69 ),
72 );
73
74 let a_checkpoints_quorum_reached = a_checkpoint_largest_seqs
75 .clone()
76 .key_count()
77 .filter_map(q!(move |num_received| if num_received == f + 1 {
78 Some(true)
79 } else {
80 None
81 }));
82
83 a_checkpoint_largest_seqs
85 .entries()
86 .continue_if(a_checkpoints_quorum_reached)
87 .map(q!(|(_sender, seq)| seq))
88 .min()
89 .latest()
90 };
91
92 acceptor_checkpoint_complete.complete(a_checkpoint);
93
94 let c_received_payloads = processed_payloads
95 .map(q!(|payload| (
96 payload.value.0,
97 ((payload.key, payload.value.1), Ok(()))
98 )))
99 .demux_bincode(clients)
100 .values();
101
102 collect_quorum::<_, _, _, ()>(
104 c_received_payloads.atomic(&clients.tick()),
105 f + 1,
106 num_replicas,
107 )
108 .0
109 .end_atomic()
110 };
111
112 let bench_results = bench_client(
113 clients,
114 inc_u32_workload_generator,
115 paxos_processor,
116 num_clients_per_node,
117 nondet!(),
118 );
119
120 print_bench_results(bench_results, client_aggregator, clients);
121}
122
123pub fn inc_u32_workload_generator<'a, Client>(
125 _client: &Cluster<'a, Client>,
126 payload_request: Stream<(u32, Option<u32>), Cluster<'a, Client>, Unbounded, NoOrder>,
127) -> Stream<(u32, u32), Cluster<'a, Client>, Unbounded, NoOrder> {
128 payload_request.map(q!(move |(virtual_id, payload)| {
129 let value = if let Some(payload) = payload {
130 payload + 1
131 } else {
132 0
133 };
134 (virtual_id, value)
135 }))
136}
137
138#[cfg(test)]
139mod tests {
140 use dfir_lang::graph::WriteConfig;
141 use hydro_deploy::Deployment;
142 use hydro_lang::deploy::{DeployCrateWrapper, HydroDeploy, TrybuildHost};
143
144 use crate::cluster::paxos::{CorePaxos, PaxosConfig};
145
146 const PAXOS_F: usize = 1;
147
148 #[cfg(stageleft_runtime)]
149 fn create_paxos<'a>(
150 proposers: &hydro_lang::Cluster<'a, crate::cluster::paxos::Proposer>,
151 acceptors: &hydro_lang::Cluster<'a, crate::cluster::paxos::Acceptor>,
152 clients: &hydro_lang::Cluster<'a, super::Client>,
153 client_aggregator: &hydro_lang::Process<'a, super::Aggregator>,
154 replicas: &hydro_lang::Cluster<'a, crate::cluster::kv_replica::Replica>,
155 ) {
156 super::paxos_bench(
157 100,
158 1000,
159 PAXOS_F,
160 PAXOS_F + 1,
161 CorePaxos {
162 proposers: proposers.clone(),
163 acceptors: acceptors.clone(),
164 paxos_config: PaxosConfig {
165 f: 1,
166 i_am_leader_send_timeout: 5,
167 i_am_leader_check_timeout: 10,
168 i_am_leader_check_timeout_delay_multiplier: 15,
169 },
170 },
171 clients,
172 client_aggregator,
173 replicas,
174 );
175 }
176
177 #[test]
178 fn paxos_ir() {
179 let builder = hydro_lang::FlowBuilder::new();
180 let proposers = builder.cluster();
181 let acceptors = builder.cluster();
182 let clients = builder.cluster();
183 let client_aggregator = builder.process();
184 let replicas = builder.cluster();
185
186 create_paxos(
187 &proposers,
188 &acceptors,
189 &clients,
190 &client_aggregator,
191 &replicas,
192 );
193 let built = builder.with_default_optimize::<HydroDeploy>();
194
195 hydro_lang::ir::dbg_dedup_tee(|| {
196 hydro_build_utils::assert_debug_snapshot!(built.ir());
197 });
198
199 let preview = built.preview_compile();
200 hydro_build_utils::insta::with_settings!({
201 snapshot_suffix => "proposer_mermaid"
202 }, {
203 hydro_build_utils::assert_snapshot!(
204 preview.dfir_for(&proposers).to_mermaid(&WriteConfig {
205 no_subgraphs: true,
206 no_pull_push: true,
207 no_handoffs: true,
208 op_text_no_imports: true,
209 ..WriteConfig::default()
210 })
211 );
212 });
213 hydro_build_utils::insta::with_settings!({
214 snapshot_suffix => "acceptor_mermaid"
215 }, {
216 hydro_build_utils::assert_snapshot!(
217 preview.dfir_for(&acceptors).to_mermaid(&WriteConfig {
218 no_subgraphs: true,
219 no_pull_push: true,
220 no_handoffs: true,
221 op_text_no_imports: true,
222 ..WriteConfig::default()
223 })
224 );
225 });
226 }
227
228 #[tokio::test]
229 async fn paxos_some_throughput() {
230 let builder = hydro_lang::FlowBuilder::new();
231 let proposers = builder.cluster();
232 let acceptors = builder.cluster();
233 let clients = builder.cluster();
234 let client_aggregator = builder.process();
235 let replicas = builder.cluster();
236
237 create_paxos(
238 &proposers,
239 &acceptors,
240 &clients,
241 &client_aggregator,
242 &replicas,
243 );
244 let mut deployment = Deployment::new();
245
246 let nodes = builder
247 .with_cluster(
248 &proposers,
249 (0..PAXOS_F + 1).map(|_| TrybuildHost::new(deployment.Localhost())),
250 )
251 .with_cluster(
252 &acceptors,
253 (0..2 * PAXOS_F + 1).map(|_| TrybuildHost::new(deployment.Localhost())),
254 )
255 .with_cluster(&clients, vec![TrybuildHost::new(deployment.Localhost())])
256 .with_process(
257 &client_aggregator,
258 TrybuildHost::new(deployment.Localhost()),
259 )
260 .with_cluster(
261 &replicas,
262 (0..PAXOS_F + 1).map(|_| TrybuildHost::new(deployment.Localhost())),
263 )
264 .deploy(&mut deployment);
265
266 deployment.deploy().await.unwrap();
267
268 let client_node = &nodes.get_process(&client_aggregator);
269 let client_out = client_node.stdout_filter("Throughput:").await;
270
271 deployment.start().await.unwrap();
272
273 use std::str::FromStr;
274
275 use regex::Regex;
276
277 let re = Regex::new(r"Throughput: ([^ ]+) - ([^ ]+) - ([^ ]+) requests/s").unwrap();
278 let mut found = 0;
279 let mut client_out = client_out;
280 while let Some(line) = client_out.recv().await {
281 if let Some(caps) = re.captures(&line)
282 && let Ok(lower) = f64::from_str(&caps[1])
283 && 0.0 < lower
284 {
285 println!("Found throughput lower-bound: {}", lower);
286 found += 1;
287 if found == 2 {
288 break;
289 }
290 }
291 }
292 }
293}