Skip to main content

hydro_test/cluster/
two_pc_bench.rs

1use hydro_lang::prelude::*;
2use hydro_std::bench_client::{
3    BenchResult, aggregate_bench_results, bench_client, compute_throughput_latency,
4};
5
6use super::two_pc::{Coordinator, Participant};
7use crate::cluster::paxos_bench::inc_i32_workload_generator;
8use crate::cluster::two_pc::two_pc;
9
10pub struct Client;
11pub struct Aggregator;
12
13#[expect(clippy::too_many_arguments, reason = "internal 2PC code // TODO")]
14pub fn two_pc_bench<'a>(
15    coordinator: &Process<'a, Coordinator>,
16    participants: &Cluster<'a, Participant>,
17    num_participants: usize,
18    clients: &Cluster<'a, Client>,
19    num_clients_per_node: Singleton<usize, Cluster<'a, Client>, Bounded>,
20    client_aggregator: &Process<'a, Aggregator>,
21    client_interval_millis: u64,
22    aggregate_interval_millis: u64,
23    print_results: impl FnOnce(BenchResult<Process<'a, Aggregator>>),
24) {
25    let latencies = bench_client(
26        clients,
27        num_clients_per_node,
28        inc_i32_workload_generator,
29        |input| {
30            two_pc(
31                coordinator,
32                participants,
33                num_participants,
34                input
35                    .entries()
36                    .send(coordinator, TCP.fail_stop().bincode())
37                    .entries(),
38            )
39            .demux(clients, TCP.fail_stop().bincode())
40            .into_keyed()
41        },
42    )
43    .entries()
44    .map(q!(|(_virtual_client_id, (_output, latency))| latency));
45
46    // Create throughput/latency graphs
47    let bench_results = compute_throughput_latency(
48        clients,
49        latencies,
50        client_interval_millis,
51        nondet!(/** bench */),
52    );
53    let aggregate_results =
54        aggregate_bench_results(bench_results, client_aggregator, aggregate_interval_millis);
55    print_results(aggregate_results);
56}
57
58#[cfg(test)]
59mod tests {
60    use dfir_lang::graph::WriteConfig;
61    use hydro_deploy::Deployment;
62    use hydro_lang::deploy::{DeployCrateWrapper, HydroDeploy, TrybuildHost};
63    #[cfg(stageleft_runtime)]
64    use hydro_lang::location::{Cluster, Process};
65
66    #[cfg(stageleft_runtime)]
67    use crate::cluster::{
68        two_pc::{Coordinator, Participant},
69        two_pc_bench::{Aggregator, Client},
70    };
71
72    const NUM_PARTICIPANTS: usize = 3;
73
74    #[cfg(stageleft_runtime)]
75    fn create_two_pc<'a>(
76        coordinator: &Process<'a, Coordinator>,
77        participants: &Cluster<'a, Participant>,
78        clients: &Cluster<'a, Client>,
79        client_aggregator: &Process<'a, Aggregator>,
80    ) {
81        use hydro_lang::location::Location;
82        use hydro_std::bench_client::pretty_print_bench_results;
83        use stageleft::q;
84
85        super::two_pc_bench(
86            coordinator,
87            participants,
88            NUM_PARTICIPANTS,
89            clients,
90            clients.singleton(q!(100usize)),
91            client_aggregator,
92            100,
93            1000,
94            pretty_print_bench_results,
95        );
96    }
97
98    #[test]
99    fn two_pc_ir() {
100        let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
101        let coordinator = builder.process();
102        let participants = builder.cluster();
103        let clients = builder.cluster();
104        let client_aggregator = builder.process();
105
106        create_two_pc(&coordinator, &participants, &clients, &client_aggregator);
107        let mut built = builder.with_default_optimize::<HydroDeploy>();
108
109        hydro_lang::compile::ir::dbg_dedup_tee(|| {
110            hydro_build_utils::assert_debug_snapshot!(built.ir());
111        });
112
113        let preview = built.preview_compile();
114        hydro_build_utils::insta::with_settings!({
115            snapshot_suffix => "coordinator_mermaid"
116        }, {
117            hydro_build_utils::assert_snapshot!(
118                preview.dfir_for(&coordinator).to_mermaid(&WriteConfig {
119                    no_subgraphs: true,
120                    no_pull_push: true,
121                    no_handoffs: true,
122                    op_text_no_imports: true,
123                    ..WriteConfig::default()
124                })
125            );
126        });
127
128        let preview = built.preview_compile();
129        hydro_build_utils::insta::with_settings!({
130            snapshot_suffix => "participants_mermaid"
131        }, {
132            hydro_build_utils::assert_snapshot!(
133                preview.dfir_for(&participants).to_mermaid(&WriteConfig {
134                    no_subgraphs: true,
135                    no_pull_push: true,
136                    no_handoffs: true,
137                    op_text_no_imports: true,
138                    ..WriteConfig::default()
139                })
140            );
141        });
142    }
143
144    #[tokio::test]
145    async fn two_pc_some_throughput() {
146        let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
147        let coordinator = builder.process();
148        let participants = builder.cluster();
149        let clients = builder.cluster();
150        let client_aggregator = builder.process();
151
152        create_two_pc(&coordinator, &participants, &clients, &client_aggregator);
153        let mut deployment = Deployment::new();
154
155        let nodes = builder
156            .with_process(&coordinator, TrybuildHost::new(deployment.Localhost()))
157            .with_cluster(
158                &participants,
159                (0..NUM_PARTICIPANTS).map(|_| TrybuildHost::new(deployment.Localhost())),
160            )
161            .with_cluster(&clients, vec![TrybuildHost::new(deployment.Localhost())])
162            .with_process(
163                &client_aggregator,
164                TrybuildHost::new(deployment.Localhost()),
165            )
166            .deploy(&mut deployment);
167
168        deployment.deploy().await.unwrap();
169
170        let client_node = &nodes.get_process(&client_aggregator);
171        let client_out = client_node.stdout_filter("Throughput:");
172
173        deployment.start().await.unwrap();
174
175        use std::str::FromStr;
176
177        use regex::Regex;
178
179        let re = Regex::new(r"Throughput: ([^ ]+) requests/s").unwrap();
180        let mut found = 0;
181        let mut client_out = client_out;
182        while let Some(line) = client_out.recv().await {
183            if let Some(caps) = re.captures(&line)
184                && let Ok(lower) = f64::from_str(&caps[1])
185                && 0.0 < lower
186            {
187                println!("Found throughput lower-bound: {}", lower);
188                found += 1;
189                if found == 2 {
190                    break;
191                }
192            }
193        }
194    }
195}