Skip to main content

hydro_test/cluster/
paxos_bench.rs

1use hydro_lang::live_collections::stream::NoOrder;
2use hydro_lang::location::cluster::CLUSTER_SELF_ID;
3use hydro_lang::prelude::*;
4use hydro_std::bench_client::{
5    BenchResult, aggregate_bench_results, bench_client, compute_throughput_latency,
6};
7use hydro_std::quorum::collect_quorum;
8
9use super::kv_replica::{KvPayload, Replica, kv_replica};
10use super::paxos_with_client::PaxosLike;
11
12pub struct Client;
13pub struct Aggregator;
14
15#[expect(clippy::too_many_arguments, reason = "internal paxos code // TODO")]
16pub fn paxos_bench<'a>(
17    checkpoint_frequency: usize, // How many sequence numbers to commit before checkpointing
18    f: usize, /* Maximum number of faulty nodes. A payload has been processed once f+1 replicas have processed it. */
19    num_replicas: usize,
20    paxos: impl PaxosLike<'a>,
21    clients: &Cluster<'a, Client>,
22    num_clients_per_node: Singleton<usize, Cluster<'a, Client>, Bounded>,
23    client_aggregator: &Process<'a, Aggregator>,
24    replicas: &Cluster<'a, Replica>,
25    client_interval_millis: u64,
26    aggregate_interval_millis: u64,
27    print_results: impl FnOnce(BenchResult<Process<'a, Aggregator>>),
28) {
29    let latencies = bench_client(
30        clients,
31        num_clients_per_node,
32        inc_i32_workload_generator,
33        |input| {
34            let acceptors = paxos.log_stores().clone();
35            let (acceptor_checkpoint_complete, acceptor_checkpoint) =
36                acceptors.forward_ref::<Optional<_, _, _>>();
37
38            let sequenced_payloads = paxos.with_client(
39                clients,
40                input
41                    .entries()
42                    .map(q!(move |(virtual_id, payload)| {
43                        // Append Client ID so replicas know who to contact later
44                        (virtual_id, (CLUSTER_SELF_ID.clone(), payload))
45                    }))
46                    .assume_ordering(nondet!(/** benchmarking, order actually doesn't matter */)),
47                acceptor_checkpoint,
48                // TODO(shadaj): we should retry when a payload is dropped due to stale leader
49                nondet!(/** benchmarking, assuming no re-election */),
50                nondet!(
51                    /// clients 'own' certain keys, so interleaving elements from clients will not affect
52                    /// the order of writes to the same key
53                ),
54            );
55
56            let sequenced_to_replicas = sequenced_payloads
57                .broadcast(replicas, TCP.fail_stop().bincode(), nondet!(/** TODO */))
58                .values()
59                .map(q!(|(index, payload)| (
60                    index,
61                    payload.map(|(key, value)| KvPayload { key, value })
62                )));
63
64            // Replicas
65            let (replica_checkpoint, processed_payloads) =
66                kv_replica(replicas, sequenced_to_replicas, checkpoint_frequency);
67
68            // Get the latest checkpoint sequence per replica
69            let a_checkpoint = {
70                let a_checkpoint_largest_seqs = replica_checkpoint
71                    .broadcast(&acceptors, TCP.fail_stop().bincode(), nondet!(/** TODO */))
72                    .reduce(q!(
73                        |curr_seq, seq| {
74                            if seq > *curr_seq {
75                                *curr_seq = seq;
76                            }
77                        },
78                        commutative = manual_proof!(/** max is commutative */)
79                    ));
80
81                sliced! {
82                    let snapshot = use(a_checkpoint_largest_seqs, nondet!(
83                        /// even though we batch the checkpoint messages, because we reduce over the entire history,
84                        /// the final min checkpoint is deterministic
85                    ));
86
87                    let a_checkpoints_quorum_reached = snapshot
88                        .clone()
89                        .key_count()
90                        .map(q!(move |num_received| num_received == f + 1));
91
92                    // Find the smallest checkpoint seq that everyone agrees to
93                    snapshot
94                        .entries()
95                        .filter_if(a_checkpoints_quorum_reached)
96                        .map(q!(|(_sender, seq)| seq))
97                        .min()
98                }
99            };
100
101            acceptor_checkpoint_complete.complete(a_checkpoint);
102
103            let c_received_payloads = processed_payloads
104                .map(q!(|payload| (
105                    payload.value.0,
106                    ((payload.key, payload.value.1), Ok(()))
107                )))
108                .demux(clients, TCP.fail_stop().bincode())
109                .values();
110
111            // we only mark a transaction as committed when all replicas have applied it
112            collect_quorum::<_, _, _, ()>(c_received_payloads, f + 1, num_replicas)
113                .0
114                .into_keyed()
115        },
116    )
117    .entries()
118    .map(q!(|(_virtual_client_id, (_output, latency))| latency));
119
120    // Create throughput/latency graphs
121    let bench_results = compute_throughput_latency(
122        clients,
123        latencies,
124        client_interval_millis,
125        nondet!(/** bench */),
126    );
127    let aggregate_results =
128        aggregate_bench_results(bench_results, client_aggregator, aggregate_interval_millis);
129    print_results(aggregate_results);
130}
131
132/// Generates an incrementing u32 for each virtual client ID, starting at 0
133pub fn inc_i32_workload_generator<'a, Client>(
134    ids_and_prev_payloads: KeyedStream<u32, Option<i32>, Cluster<'a, Client>, Unbounded, NoOrder>,
135) -> KeyedStream<u32, i32, Cluster<'a, Client>, Unbounded, NoOrder> {
136    ids_and_prev_payloads.map(q!(move |payload| {
137        if let Some(counter) = payload {
138            counter + 1
139        } else {
140            0
141        }
142    }))
143}
144
145#[cfg(test)]
146mod tests {
147    use dfir_lang::graph::WriteConfig;
148    use hydro_deploy::Deployment;
149    use hydro_lang::deploy::{DeployCrateWrapper, HydroDeploy, TrybuildHost};
150
151    #[cfg(stageleft_runtime)]
152    use crate::cluster::paxos::{CorePaxos, PaxosConfig};
153
154    const PAXOS_F: usize = 1;
155
156    #[cfg(stageleft_runtime)]
157    fn create_paxos<'a>(
158        proposers: &hydro_lang::location::Cluster<'a, crate::cluster::paxos::Proposer>,
159        acceptors: &hydro_lang::location::Cluster<'a, crate::cluster::paxos::Acceptor>,
160        clients: &hydro_lang::location::Cluster<'a, super::Client>,
161        client_aggregator: &hydro_lang::location::Process<'a, super::Aggregator>,
162        replicas: &hydro_lang::location::Cluster<'a, crate::cluster::kv_replica::Replica>,
163    ) {
164        use hydro_lang::location::Location;
165        use hydro_std::bench_client::pretty_print_bench_results;
166        use stageleft::q;
167
168        super::paxos_bench(
169            1000,
170            PAXOS_F,
171            PAXOS_F + 1,
172            CorePaxos {
173                proposers: proposers.clone(),
174                acceptors: acceptors.clone(),
175                paxos_config: PaxosConfig {
176                    f: 1,
177                    i_am_leader_send_timeout: 5,
178                    i_am_leader_check_timeout: 10,
179                    i_am_leader_check_timeout_delay_multiplier: 15,
180                },
181            },
182            clients,
183            clients.singleton(q!(100usize)),
184            client_aggregator,
185            replicas,
186            100,
187            1000,
188            pretty_print_bench_results,
189        );
190    }
191
192    #[test]
193    fn paxos_ir() {
194        let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
195        let proposers = builder.cluster();
196        let acceptors = builder.cluster();
197        let clients = builder.cluster();
198        let client_aggregator = builder.process();
199        let replicas = builder.cluster();
200
201        create_paxos(
202            &proposers,
203            &acceptors,
204            &clients,
205            &client_aggregator,
206            &replicas,
207        );
208        let mut built = builder.with_default_optimize::<HydroDeploy>();
209
210        hydro_lang::compile::ir::dbg_dedup_tee(|| {
211            hydro_build_utils::assert_debug_snapshot!(built.ir());
212        });
213
214        let preview = built.preview_compile();
215        hydro_build_utils::insta::with_settings!({
216            snapshot_suffix => "proposer_mermaid"
217        }, {
218            hydro_build_utils::assert_snapshot!(
219                preview.dfir_for(&proposers).to_mermaid(&WriteConfig {
220                    no_subgraphs: true,
221                    no_pull_push: true,
222                    no_handoffs: true,
223                    op_text_no_imports: true,
224                    ..WriteConfig::default()
225                })
226            );
227        });
228        hydro_build_utils::insta::with_settings!({
229            snapshot_suffix => "acceptor_mermaid"
230        }, {
231            hydro_build_utils::assert_snapshot!(
232                preview.dfir_for(&acceptors).to_mermaid(&WriteConfig {
233                    no_subgraphs: true,
234                    no_pull_push: true,
235                    no_handoffs: true,
236                    op_text_no_imports: true,
237                    ..WriteConfig::default()
238                })
239            );
240        });
241    }
242
243    #[tokio::test]
244    async fn paxos_some_throughput() {
245        let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
246        let proposers = builder.cluster();
247        let acceptors = builder.cluster();
248        let clients = builder.cluster();
249        let client_aggregator = builder.process();
250        let replicas = builder.cluster();
251
252        create_paxos(
253            &proposers,
254            &acceptors,
255            &clients,
256            &client_aggregator,
257            &replicas,
258        );
259        let mut deployment = Deployment::new();
260
261        let nodes = builder
262            .with_cluster(
263                &proposers,
264                (0..PAXOS_F + 1).map(|_| TrybuildHost::new(deployment.Localhost())),
265            )
266            .with_cluster(
267                &acceptors,
268                (0..2 * PAXOS_F + 1).map(|_| TrybuildHost::new(deployment.Localhost())),
269            )
270            .with_cluster(&clients, vec![TrybuildHost::new(deployment.Localhost())])
271            .with_process(
272                &client_aggregator,
273                TrybuildHost::new(deployment.Localhost()),
274            )
275            .with_cluster(
276                &replicas,
277                (0..PAXOS_F + 1).map(|_| TrybuildHost::new(deployment.Localhost())),
278            )
279            .deploy(&mut deployment);
280
281        deployment.deploy().await.unwrap();
282
283        let client_node = &nodes.get_process(&client_aggregator);
284        let client_out = client_node.stdout_filter("Throughput:");
285
286        deployment.start().await.unwrap();
287
288        use std::str::FromStr;
289
290        use regex::Regex;
291
292        let re = Regex::new(r"Throughput: ([^ ]+) requests/s").unwrap();
293        let mut found = 0;
294        let mut client_out = client_out;
295        while let Some(line) = client_out.recv().await {
296            if let Some(caps) = re.captures(&line)
297                && let Ok(lower) = f64::from_str(&caps[1])
298                && 0.0 < lower
299            {
300                println!("Found throughput lower-bound: {}", lower);
301                found += 1;
302                if found == 2 {
303                    break;
304                }
305            }
306        }
307    }
308}