1use hydro_lang::live_collections::stream::NoOrder;
2use hydro_lang::location::cluster::CLUSTER_SELF_ID;
3use hydro_lang::prelude::*;
4use hydro_std::bench_client::{
5 BenchResult, aggregate_bench_results, bench_client, compute_throughput_latency,
6};
7
8use super::paxos_with_client::PaxosLike;
9use crate::cluster::paxos_bench::inc_i32_workload_generator;
10
11pub struct Client;
12pub struct Aggregator;
13
14#[expect(clippy::too_many_arguments, reason = "internal paxos code // TODO")]
15pub fn paxos_log_bench<'a>(
16 checkpoint_frequency: usize, paxos: impl PaxosLike<'a>,
18 clients: &Cluster<'a, Client>,
19 num_clients_per_node: Singleton<usize, Cluster<'a, Client>, Bounded>,
20 client_aggregator: &Process<'a, Aggregator>,
21 client_interval_millis: u64,
22 aggregate_interval_millis: u64,
23 print_results: impl FnOnce(BenchResult<Process<'a, Aggregator>>),
24) {
25 let latencies = bench_client(
26 clients,
27 num_clients_per_node,
28 inc_i32_workload_generator,
29 |input| {
30 let acceptors = paxos.log_stores().clone();
31 let (acceptor_checkpoint_complete, acceptor_checkpoint) =
32 acceptors.forward_ref::<Optional<_, _, _>>();
33
34 let sequenced_payloads = paxos.with_client(
35 clients,
36 input
37 .entries()
38 .map(q!(move |(virtual_id, payload)| {
39 (virtual_id, (CLUSTER_SELF_ID.clone(), payload))
41 }))
42 .assume_ordering(nondet!()),
43 acceptor_checkpoint,
44 nondet!(),
46 nondet!(
47 ),
50 );
51
52 let sequenced_to_clients = sequenced_payloads
54 .clone()
55 .map(q!(|(_seq, payload)| {
56 let (virtual_id, (client_location, value)) = payload.unwrap();
57 (client_location, (virtual_id, value))
58 }))
59 .demux(clients, TCP.fail_stop().bincode())
60 .values()
61 .into_keyed();
62
63 let nondet_log_holes = nondet!();
65 let p_checkpoint = sliced! {
66 let new_slots = use(sequenced_payloads.into_keyed().keys(), nondet_log_holes);
67 let mut log_holes = use::state_null::<Stream<usize, Tick<_>, Bounded, NoOrder>>();
68 let mut prev_checkpoint_slot = use::state::<Singleton<usize, Tick<_>, Bounded>>(|l| l.singleton(q!(0)));
69
70 let max_contiguous_slot = log_holes.clone().min().unwrap_or_default();
72 let new_checkpoint = max_contiguous_slot
73 .zip(prev_checkpoint_slot.clone())
74 .filter_map(q!(move |(max_contiguous, prev_checkpoint)|
75 (max_contiguous - prev_checkpoint >= checkpoint_frequency).then_some(max_contiguous)));
76 prev_checkpoint_slot = new_checkpoint.clone().unwrap_or(prev_checkpoint_slot);
77
78 let max_log_hole = log_holes.clone().max().unwrap_or_default();
80 let max_new_slot = new_slots.clone().max();
81 let new_potential_holes = max_log_hole
83 .zip(max_new_slot)
84 .flat_map_unordered(q!(|(max_hole, max_new_slot)| max_hole+1..max_new_slot+2));
85 let new_holes = new_potential_holes.chain(log_holes.clone())
86 .filter_not_in(new_slots);
87 log_holes = new_holes;
88
89 new_checkpoint.into_stream()
90 };
91 let a_checkpoint = p_checkpoint
92 .broadcast(&acceptors, TCP.fail_stop().bincode(), nondet!())
93 .values()
94 .max();
95 acceptor_checkpoint_complete.complete(a_checkpoint);
96
97 sequenced_to_clients
98 },
99 )
100 .entries()
101 .map(q!(|(_virtual_client_id, (_output, latency))| latency));
102
103 let bench_results = compute_throughput_latency(
105 clients,
106 latencies,
107 client_interval_millis,
108 nondet!(),
109 );
110 let aggregate_results =
111 aggregate_bench_results(bench_results, client_aggregator, aggregate_interval_millis);
112 print_results(aggregate_results);
113}
114
115#[cfg(test)]
116mod tests {
117 use hydro_deploy::Deployment;
118 use hydro_lang::deploy::{DeployCrateWrapper, TrybuildHost};
119
120 #[cfg(stageleft_runtime)]
121 use crate::cluster::paxos::{CorePaxos, PaxosConfig};
122
123 const PAXOS_F: usize = 1;
124
125 #[cfg(stageleft_runtime)]
126 fn create_paxos<'a>(
127 proposers: &hydro_lang::location::Cluster<'a, crate::cluster::paxos::Proposer>,
128 acceptors: &hydro_lang::location::Cluster<'a, crate::cluster::paxos::Acceptor>,
129 clients: &hydro_lang::location::Cluster<'a, super::Client>,
130 client_aggregator: &hydro_lang::location::Process<'a, super::Aggregator>,
131 ) {
132 use hydro_lang::location::Location;
133 use hydro_std::bench_client::pretty_print_bench_results;
134 use stageleft::q;
135
136 super::paxos_log_bench(
137 1000,
138 CorePaxos {
139 proposers: proposers.clone(),
140 acceptors: acceptors.clone(),
141 paxos_config: PaxosConfig {
142 f: 1,
143 i_am_leader_send_timeout: 5,
144 i_am_leader_check_timeout: 10,
145 i_am_leader_check_timeout_delay_multiplier: 15,
146 },
147 },
148 clients,
149 clients.singleton(q!(100usize)),
150 client_aggregator,
151 100,
152 1000,
153 pretty_print_bench_results,
154 );
155 }
156
157 #[tokio::test]
158 async fn paxos_log_some_throughput() {
159 let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
160 let proposers = builder.cluster();
161 let acceptors = builder.cluster();
162 let clients = builder.cluster();
163 let client_aggregator = builder.process();
164
165 create_paxos(&proposers, &acceptors, &clients, &client_aggregator);
166 let mut deployment = Deployment::new();
167
168 let nodes = builder
169 .with_cluster(
170 &proposers,
171 (0..PAXOS_F + 1).map(|_| TrybuildHost::new(deployment.Localhost())),
172 )
173 .with_cluster(
174 &acceptors,
175 (0..2 * PAXOS_F + 1).map(|_| TrybuildHost::new(deployment.Localhost())),
176 )
177 .with_cluster(&clients, vec![TrybuildHost::new(deployment.Localhost())])
178 .with_process(
179 &client_aggregator,
180 TrybuildHost::new(deployment.Localhost()),
181 )
182 .deploy(&mut deployment);
183
184 deployment.deploy().await.unwrap();
185
186 let client_node = &nodes.get_process(&client_aggregator);
187 let client_out = client_node.stdout_filter("Throughput:");
188
189 deployment.start().await.unwrap();
190
191 use std::str::FromStr;
192
193 use regex::Regex;
194
195 let re = Regex::new(r"Throughput: ([^ ]+) requests/s").unwrap();
196 let mut found = 0;
197 let mut client_out = client_out;
198 while let Some(line) = client_out.recv().await {
199 if let Some(caps) = re.captures(&line)
200 && let Ok(lower) = f64::from_str(&caps[1])
201 && 0.0 < lower
202 {
203 println!("Found throughput lower-bound: {}", lower);
204 found += 1;
205 if found == 2 {
206 break;
207 }
208 }
209 }
210 }
211}