1use hydro_lang::prelude::*;
2use hydro_std::bench_client::{
3 BenchResult, aggregate_bench_results, bench_client, compute_throughput_latency,
4};
5
6use super::two_pc::{Coordinator, Participant};
7use crate::cluster::paxos_bench::inc_i32_workload_generator;
8use crate::cluster::two_pc::two_pc;
9
10pub struct Client;
11pub struct Aggregator;
12
13#[expect(clippy::too_many_arguments, reason = "internal 2PC code // TODO")]
14pub fn two_pc_bench<'a>(
15 coordinator: &Process<'a, Coordinator>,
16 participants: &Cluster<'a, Participant>,
17 num_participants: usize,
18 clients: &Cluster<'a, Client>,
19 num_clients_per_node: Singleton<usize, Cluster<'a, Client>, Bounded>,
20 client_aggregator: &Process<'a, Aggregator>,
21 client_interval_millis: u64,
22 aggregate_interval_millis: u64,
23 print_results: impl FnOnce(BenchResult<Process<'a, Aggregator>>),
24) {
25 let latencies = bench_client(
26 clients,
27 num_clients_per_node,
28 inc_i32_workload_generator,
29 |input| {
30 two_pc(
31 coordinator,
32 participants,
33 num_participants,
34 input
35 .entries()
36 .send(coordinator, TCP.fail_stop().bincode())
37 .entries(),
38 )
39 .demux(clients, TCP.fail_stop().bincode())
40 .into_keyed()
41 },
42 )
43 .entries()
44 .map(q!(|(_virtual_client_id, (_output, latency))| latency));
45
46 let bench_results = compute_throughput_latency(
48 clients,
49 latencies,
50 client_interval_millis,
51 nondet!(),
52 );
53 let aggregate_results =
54 aggregate_bench_results(bench_results, client_aggregator, aggregate_interval_millis);
55 print_results(aggregate_results);
56}
57
58#[cfg(test)]
59mod tests {
60 use dfir_lang::graph::WriteConfig;
61 use hydro_deploy::Deployment;
62 use hydro_lang::deploy::{DeployCrateWrapper, HydroDeploy, TrybuildHost};
63 #[cfg(stageleft_runtime)]
64 use hydro_lang::location::{Cluster, Process};
65
66 #[cfg(stageleft_runtime)]
67 use crate::cluster::{
68 two_pc::{Coordinator, Participant},
69 two_pc_bench::{Aggregator, Client},
70 };
71
72 const NUM_PARTICIPANTS: usize = 3;
73
74 #[cfg(stageleft_runtime)]
75 fn create_two_pc<'a>(
76 coordinator: &Process<'a, Coordinator>,
77 participants: &Cluster<'a, Participant>,
78 clients: &Cluster<'a, Client>,
79 client_aggregator: &Process<'a, Aggregator>,
80 ) {
81 use hydro_lang::location::Location;
82 use hydro_std::bench_client::pretty_print_bench_results;
83 use stageleft::q;
84
85 super::two_pc_bench(
86 coordinator,
87 participants,
88 NUM_PARTICIPANTS,
89 clients,
90 clients.singleton(q!(100usize)),
91 client_aggregator,
92 100,
93 1000,
94 pretty_print_bench_results,
95 );
96 }
97
98 #[test]
99 fn two_pc_ir() {
100 let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
101 let coordinator = builder.process();
102 let participants = builder.cluster();
103 let clients = builder.cluster();
104 let client_aggregator = builder.process();
105
106 create_two_pc(&coordinator, &participants, &clients, &client_aggregator);
107 let mut built = builder.with_default_optimize::<HydroDeploy>();
108
109 hydro_lang::compile::ir::dbg_dedup_tee(|| {
110 hydro_build_utils::assert_debug_snapshot!(built.ir());
111 });
112
113 let preview = built.preview_compile();
114 hydro_build_utils::insta::with_settings!({
115 snapshot_suffix => "coordinator_mermaid"
116 }, {
117 hydro_build_utils::assert_snapshot!(
118 preview.dfir_for(&coordinator).to_mermaid(&WriteConfig {
119 no_subgraphs: true,
120 no_pull_push: true,
121 no_handoffs: true,
122 op_text_no_imports: true,
123 ..WriteConfig::default()
124 })
125 );
126 });
127
128 let preview = built.preview_compile();
129 hydro_build_utils::insta::with_settings!({
130 snapshot_suffix => "participants_mermaid"
131 }, {
132 hydro_build_utils::assert_snapshot!(
133 preview.dfir_for(&participants).to_mermaid(&WriteConfig {
134 no_subgraphs: true,
135 no_pull_push: true,
136 no_handoffs: true,
137 op_text_no_imports: true,
138 ..WriteConfig::default()
139 })
140 );
141 });
142 }
143
144 #[tokio::test]
145 async fn two_pc_some_throughput() {
146 let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
147 let coordinator = builder.process();
148 let participants = builder.cluster();
149 let clients = builder.cluster();
150 let client_aggregator = builder.process();
151
152 create_two_pc(&coordinator, &participants, &clients, &client_aggregator);
153 let mut deployment = Deployment::new();
154
155 let nodes = builder
156 .with_process(&coordinator, TrybuildHost::new(deployment.Localhost()))
157 .with_cluster(
158 &participants,
159 (0..NUM_PARTICIPANTS).map(|_| TrybuildHost::new(deployment.Localhost())),
160 )
161 .with_cluster(&clients, vec![TrybuildHost::new(deployment.Localhost())])
162 .with_process(
163 &client_aggregator,
164 TrybuildHost::new(deployment.Localhost()),
165 )
166 .deploy(&mut deployment);
167
168 deployment.deploy().await.unwrap();
169
170 let client_node = &nodes.get_process(&client_aggregator);
171 let client_out = client_node.stdout_filter("Throughput:");
172
173 deployment.start().await.unwrap();
174
175 use std::str::FromStr;
176
177 use regex::Regex;
178
179 let re = Regex::new(r"Throughput: ([^ ]+) requests/s").unwrap();
180 let mut found = 0;
181 let mut client_out = client_out;
182 while let Some(line) = client_out.recv().await {
183 if let Some(caps) = re.captures(&line)
184 && let Ok(lower) = f64::from_str(&caps[1])
185 && 0.0 < lower
186 {
187 println!("Found throughput lower-bound: {}", lower);
188 found += 1;
189 if found == 2 {
190 break;
191 }
192 }
193 }
194 }
195}