1use hydro_lang::live_collections::stream::NoOrder;
2use hydro_lang::location::cluster::CLUSTER_SELF_ID;
3use hydro_lang::prelude::*;
4use hydro_std::bench_client::{
5 BenchResult, aggregate_bench_results, bench_client, compute_throughput_latency,
6};
7use hydro_std::quorum::collect_quorum;
8
9use super::kv_replica::{KvPayload, Replica, kv_replica};
10use super::paxos_with_client::PaxosLike;
11
12pub struct Client;
13pub struct Aggregator;
14
15#[expect(clippy::too_many_arguments, reason = "internal paxos code // TODO")]
16pub fn paxos_bench<'a>(
17 checkpoint_frequency: usize, f: usize, num_replicas: usize,
20 paxos: impl PaxosLike<'a>,
21 clients: &Cluster<'a, Client>,
22 num_clients_per_node: Singleton<usize, Cluster<'a, Client>, Bounded>,
23 client_aggregator: &Process<'a, Aggregator>,
24 replicas: &Cluster<'a, Replica>,
25 client_interval_millis: u64,
26 aggregate_interval_millis: u64,
27 print_results: impl FnOnce(BenchResult<Process<'a, Aggregator>>),
28) {
29 let latencies = bench_client(
30 clients,
31 num_clients_per_node,
32 inc_i32_workload_generator,
33 |input| {
34 let acceptors = paxos.log_stores().clone();
35 let (acceptor_checkpoint_complete, acceptor_checkpoint) =
36 acceptors.forward_ref::<Optional<_, _, _>>();
37
38 let sequenced_payloads = paxos.with_client(
39 clients,
40 input
41 .entries()
42 .map(q!(move |(virtual_id, payload)| {
43 (virtual_id, (CLUSTER_SELF_ID.clone(), payload))
45 }))
46 .assume_ordering(nondet!()),
47 acceptor_checkpoint,
48 nondet!(),
50 nondet!(
51 ),
54 );
55
56 let sequenced_to_replicas = sequenced_payloads
57 .broadcast(replicas, TCP.fail_stop().bincode(), nondet!())
58 .values()
59 .map(q!(|(index, payload)| (
60 index,
61 payload.map(|(key, value)| KvPayload { key, value })
62 )));
63
64 let (replica_checkpoint, processed_payloads) =
66 kv_replica(replicas, sequenced_to_replicas, checkpoint_frequency);
67
68 let a_checkpoint = {
70 let a_checkpoint_largest_seqs = replica_checkpoint
71 .broadcast(&acceptors, TCP.fail_stop().bincode(), nondet!())
72 .reduce(q!(
73 |curr_seq, seq| {
74 if seq > *curr_seq {
75 *curr_seq = seq;
76 }
77 },
78 commutative = manual_proof!()
79 ));
80
81 sliced! {
82 let snapshot = use(a_checkpoint_largest_seqs, nondet!(
83 ));
86
87 let a_checkpoints_quorum_reached = snapshot
88 .clone()
89 .key_count()
90 .map(q!(move |num_received| num_received == f + 1));
91
92 snapshot
94 .entries()
95 .filter_if(a_checkpoints_quorum_reached)
96 .map(q!(|(_sender, seq)| seq))
97 .min()
98 }
99 };
100
101 acceptor_checkpoint_complete.complete(a_checkpoint);
102
103 let c_received_payloads = processed_payloads
104 .map(q!(|payload| (
105 payload.value.0,
106 ((payload.key, payload.value.1), Ok(()))
107 )))
108 .demux(clients, TCP.fail_stop().bincode())
109 .values();
110
111 collect_quorum::<_, _, _, ()>(c_received_payloads, f + 1, num_replicas)
113 .0
114 .into_keyed()
115 },
116 )
117 .entries()
118 .map(q!(|(_virtual_client_id, (_output, latency))| latency));
119
120 let bench_results = compute_throughput_latency(
122 clients,
123 latencies,
124 client_interval_millis,
125 nondet!(),
126 );
127 let aggregate_results =
128 aggregate_bench_results(bench_results, client_aggregator, aggregate_interval_millis);
129 print_results(aggregate_results);
130}
131
132pub fn inc_i32_workload_generator<'a, Client>(
134 ids_and_prev_payloads: KeyedStream<u32, Option<i32>, Cluster<'a, Client>, Unbounded, NoOrder>,
135) -> KeyedStream<u32, i32, Cluster<'a, Client>, Unbounded, NoOrder> {
136 ids_and_prev_payloads.map(q!(move |payload| {
137 if let Some(counter) = payload {
138 counter + 1
139 } else {
140 0
141 }
142 }))
143}
144
145#[cfg(test)]
146mod tests {
147 use dfir_lang::graph::WriteConfig;
148 use hydro_deploy::Deployment;
149 use hydro_lang::deploy::{DeployCrateWrapper, HydroDeploy, TrybuildHost};
150
151 #[cfg(stageleft_runtime)]
152 use crate::cluster::paxos::{CorePaxos, PaxosConfig};
153
154 const PAXOS_F: usize = 1;
155
156 #[cfg(stageleft_runtime)]
157 fn create_paxos<'a>(
158 proposers: &hydro_lang::location::Cluster<'a, crate::cluster::paxos::Proposer>,
159 acceptors: &hydro_lang::location::Cluster<'a, crate::cluster::paxos::Acceptor>,
160 clients: &hydro_lang::location::Cluster<'a, super::Client>,
161 client_aggregator: &hydro_lang::location::Process<'a, super::Aggregator>,
162 replicas: &hydro_lang::location::Cluster<'a, crate::cluster::kv_replica::Replica>,
163 ) {
164 use hydro_lang::location::Location;
165 use hydro_std::bench_client::pretty_print_bench_results;
166 use stageleft::q;
167
168 super::paxos_bench(
169 1000,
170 PAXOS_F,
171 PAXOS_F + 1,
172 CorePaxos {
173 proposers: proposers.clone(),
174 acceptors: acceptors.clone(),
175 paxos_config: PaxosConfig {
176 f: 1,
177 i_am_leader_send_timeout: 5,
178 i_am_leader_check_timeout: 10,
179 i_am_leader_check_timeout_delay_multiplier: 15,
180 },
181 },
182 clients,
183 clients.singleton(q!(100usize)),
184 client_aggregator,
185 replicas,
186 100,
187 1000,
188 pretty_print_bench_results,
189 );
190 }
191
192 #[test]
193 fn paxos_ir() {
194 let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
195 let proposers = builder.cluster();
196 let acceptors = builder.cluster();
197 let clients = builder.cluster();
198 let client_aggregator = builder.process();
199 let replicas = builder.cluster();
200
201 create_paxos(
202 &proposers,
203 &acceptors,
204 &clients,
205 &client_aggregator,
206 &replicas,
207 );
208 let mut built = builder.with_default_optimize::<HydroDeploy>();
209
210 hydro_lang::compile::ir::dbg_dedup_tee(|| {
211 hydro_build_utils::assert_debug_snapshot!(built.ir());
212 });
213
214 let preview = built.preview_compile();
215 hydro_build_utils::insta::with_settings!({
216 snapshot_suffix => "proposer_mermaid"
217 }, {
218 hydro_build_utils::assert_snapshot!(
219 preview.dfir_for(&proposers).to_mermaid(&WriteConfig {
220 no_subgraphs: true,
221 no_pull_push: true,
222 no_handoffs: true,
223 op_text_no_imports: true,
224 ..WriteConfig::default()
225 })
226 );
227 });
228 hydro_build_utils::insta::with_settings!({
229 snapshot_suffix => "acceptor_mermaid"
230 }, {
231 hydro_build_utils::assert_snapshot!(
232 preview.dfir_for(&acceptors).to_mermaid(&WriteConfig {
233 no_subgraphs: true,
234 no_pull_push: true,
235 no_handoffs: true,
236 op_text_no_imports: true,
237 ..WriteConfig::default()
238 })
239 );
240 });
241 }
242
243 #[tokio::test]
244 async fn paxos_some_throughput() {
245 let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
246 let proposers = builder.cluster();
247 let acceptors = builder.cluster();
248 let clients = builder.cluster();
249 let client_aggregator = builder.process();
250 let replicas = builder.cluster();
251
252 create_paxos(
253 &proposers,
254 &acceptors,
255 &clients,
256 &client_aggregator,
257 &replicas,
258 );
259 let mut deployment = Deployment::new();
260
261 let nodes = builder
262 .with_cluster(
263 &proposers,
264 (0..PAXOS_F + 1).map(|_| TrybuildHost::new(deployment.Localhost())),
265 )
266 .with_cluster(
267 &acceptors,
268 (0..2 * PAXOS_F + 1).map(|_| TrybuildHost::new(deployment.Localhost())),
269 )
270 .with_cluster(&clients, vec![TrybuildHost::new(deployment.Localhost())])
271 .with_process(
272 &client_aggregator,
273 TrybuildHost::new(deployment.Localhost()),
274 )
275 .with_cluster(
276 &replicas,
277 (0..PAXOS_F + 1).map(|_| TrybuildHost::new(deployment.Localhost())),
278 )
279 .deploy(&mut deployment);
280
281 deployment.deploy().await.unwrap();
282
283 let client_node = &nodes.get_process(&client_aggregator);
284 let client_out = client_node.stdout_filter("Throughput:");
285
286 deployment.start().await.unwrap();
287
288 use std::str::FromStr;
289
290 use regex::Regex;
291
292 let re = Regex::new(r"Throughput: ([^ ]+) requests/s").unwrap();
293 let mut found = 0;
294 let mut client_out = client_out;
295 while let Some(line) = client_out.recv().await {
296 if let Some(caps) = re.captures(&line)
297 && let Ok(lower) = f64::from_str(&caps[1])
298 && 0.0 < lower
299 {
300 println!("Found throughput lower-bound: {}", lower);
301 found += 1;
302 if found == 2 {
303 break;
304 }
305 }
306 }
307 }
308}