1use hydro_lang::*;
2use hydro_std::bench_client::{bench_client, print_bench_results};
3
4use super::two_pc::{Coordinator, Participant};
5use crate::cluster::paxos_bench::inc_u32_workload_generator;
6use crate::cluster::two_pc::two_pc;
7
8pub struct Client;
9pub struct Aggregator;
10
11pub fn two_pc_bench<'a>(
12 num_clients_per_node: usize,
13 coordinator: &Process<'a, Coordinator>,
14 participants: &Cluster<'a, Participant>,
15 num_participants: usize,
16 clients: &Cluster<'a, Client>,
17 client_aggregator: &Process<'a, Aggregator>,
18) {
19 let bench_results = bench_client(
20 clients,
21 inc_u32_workload_generator,
22 |payloads| {
23 two_pc(
25 coordinator,
26 participants,
27 num_participants,
28 payloads.send_bincode(coordinator).entries(),
29 )
30 .demux_bincode(clients)
31 },
32 num_clients_per_node,
33 nondet!(),
34 );
35
36 print_bench_results(bench_results, client_aggregator, clients);
37}
38
39#[cfg(test)]
40mod tests {
41 use std::collections::{BTreeMap, HashMap};
42
43 use dfir_lang::graph::WriteConfig;
44 use hydro_deploy::Deployment;
45 use hydro_lang::Location;
46 use hydro_lang::deploy::{DeployCrateWrapper, HydroDeploy, TrybuildHost};
47 use hydro_lang::ir::deep_clone;
48 use hydro_lang::rewrites::persist_pullup::persist_pullup;
49 #[cfg(stageleft_runtime)]
50 use hydro_lang::{Cluster, Process};
51 use hydro_optimize::debug::name_to_id_map;
52 use hydro_optimize::partition_node_analysis::{nodes_to_partition, partitioning_analysis};
53 use hydro_optimize::partitioner::{Partitioner, partition};
54 use hydro_optimize::repair::{cycle_source_to_sink_input, inject_id, inject_location};
55
56 #[cfg(stageleft_runtime)]
57 use crate::cluster::{
58 two_pc::{Coordinator, Participant},
59 two_pc_bench::{Aggregator, Client},
60 };
61
62 const NUM_PARTICIPANTS: usize = 3;
63
64 #[cfg(stageleft_runtime)]
65 fn create_two_pc<'a>(
66 coordinator: &Process<'a, Coordinator>,
67 participants: &Cluster<'a, Participant>,
68 clients: &Cluster<'a, Client>,
69 client_aggregator: &Process<'a, Aggregator>,
70 ) {
71 super::two_pc_bench(
72 100,
73 coordinator,
74 participants,
75 NUM_PARTICIPANTS,
76 clients,
77 client_aggregator,
78 );
79 }
80
81 #[test]
82 fn two_pc_ir() {
83 let builder = hydro_lang::FlowBuilder::new();
84 let coordinator = builder.process();
85 let participants = builder.cluster();
86 let clients = builder.cluster();
87 let client_aggregator = builder.process();
88
89 create_two_pc(&coordinator, &participants, &clients, &client_aggregator);
90 let built = builder.with_default_optimize::<HydroDeploy>();
91
92 hydro_lang::ir::dbg_dedup_tee(|| {
93 hydro_build_utils::assert_debug_snapshot!(built.ir());
94 });
95
96 let preview = built.preview_compile();
97 hydro_build_utils::insta::with_settings!({
98 snapshot_suffix => "coordinator_mermaid"
99 }, {
100 hydro_build_utils::assert_snapshot!(
101 preview.dfir_for(&coordinator).to_mermaid(&WriteConfig {
102 no_subgraphs: true,
103 no_pull_push: true,
104 no_handoffs: true,
105 op_text_no_imports: true,
106 ..WriteConfig::default()
107 })
108 );
109 });
110
111 let preview = built.preview_compile();
112 hydro_build_utils::insta::with_settings!({
113 snapshot_suffix => "participants_mermaid"
114 }, {
115 hydro_build_utils::assert_snapshot!(
116 preview.dfir_for(&participants).to_mermaid(&WriteConfig {
117 no_subgraphs: true,
118 no_pull_push: true,
119 no_handoffs: true,
120 op_text_no_imports: true,
121 ..WriteConfig::default()
122 })
123 );
124 });
125 }
126
127 #[tokio::test]
128 async fn two_pc_some_throughput() {
129 let builder = hydro_lang::FlowBuilder::new();
130 let coordinator = builder.process();
131 let participants = builder.cluster();
132 let clients = builder.cluster();
133 let client_aggregator = builder.process();
134
135 create_two_pc(&coordinator, &participants, &clients, &client_aggregator);
136 let mut deployment = Deployment::new();
137
138 let nodes = builder
139 .with_process(&coordinator, TrybuildHost::new(deployment.Localhost()))
140 .with_cluster(
141 &participants,
142 (0..NUM_PARTICIPANTS).map(|_| TrybuildHost::new(deployment.Localhost())),
143 )
144 .with_cluster(&clients, vec![TrybuildHost::new(deployment.Localhost())])
145 .with_process(
146 &client_aggregator,
147 TrybuildHost::new(deployment.Localhost()),
148 )
149 .deploy(&mut deployment);
150
151 deployment.deploy().await.unwrap();
152
153 let client_node = &nodes.get_process(&client_aggregator);
154 let client_out = client_node.stdout_filter("Throughput:").await;
155
156 deployment.start().await.unwrap();
157
158 use std::str::FromStr;
159
160 use regex::Regex;
161
162 let re = Regex::new(r"Throughput: ([^ ]+) - ([^ ]+) - ([^ ]+) requests/s").unwrap();
163 let mut found = 0;
164 let mut client_out = client_out;
165 while let Some(line) = client_out.recv().await {
166 if let Some(caps) = re.captures(&line)
167 && let Ok(lower) = f64::from_str(&caps[1])
168 && 0.0 < lower
169 {
170 println!("Found throughput lower-bound: {}", lower);
171 found += 1;
172 if found == 2 {
173 break;
174 }
175 }
176 }
177 }
178
179 #[test]
180 fn two_pc_partition_coordinator() {
181 let builder = hydro_lang::FlowBuilder::new();
182 let coordinator = builder.process();
183 let partitioned_coordinator = builder.cluster::<()>();
184 let participants = builder.cluster();
185 let clients = builder.cluster();
186 let client_aggregator = builder.process();
187
188 create_two_pc(&coordinator, &participants, &clients, &client_aggregator);
189
190 let mut cycle_data = HashMap::new();
191 let built = builder
192 .optimize_with(persist_pullup)
193 .optimize_with(inject_id)
194 .optimize_with(|ir| {
195 cycle_data = cycle_source_to_sink_input(ir);
196 inject_location(ir, &cycle_data);
197 })
198 .into_deploy::<HydroDeploy>();
199 let mut ir = deep_clone(built.ir());
200
201 let coordinator_partitioning =
203 partitioning_analysis(&mut ir, &coordinator.id(), &cycle_data);
204 let name_to_id = name_to_id_map(&mut ir);
205 let c_prepare_id = *name_to_id.get("c_prepare").unwrap();
206 let c_votes_id = *name_to_id.get("c_votes").unwrap();
207 let c_commits_id = *name_to_id.get("c_commits").unwrap();
208 let expected_coordinator_partitioning = vec![BTreeMap::from([
210 (c_votes_id, vec!["1".to_string()]),
211 (c_commits_id, vec!["1".to_string()]),
212 ])];
213 let expected_coordinator_input_parents = BTreeMap::from([
214 (c_prepare_id, c_prepare_id - 1),
215 (c_votes_id, c_votes_id - 1),
216 (c_commits_id, c_commits_id - 1),
217 ]);
218 assert_eq!(
219 coordinator_partitioning,
220 Some((
221 expected_coordinator_partitioning,
222 expected_coordinator_input_parents
223 ))
224 );
225 let coordinator_nodes_to_partition = nodes_to_partition(coordinator_partitioning).unwrap();
226 let coordinator_partitioner = Partitioner {
227 nodes_to_partition: coordinator_nodes_to_partition,
228 num_partitions: 3,
229 location_id: coordinator.id().raw_id(),
230 new_cluster_id: Some(partitioned_coordinator.id().raw_id()),
231 };
232 partition(&mut ir, &coordinator_partitioner);
233
234 hydro_build_utils::assert_debug_snapshot!(&ir);
235 }
236
237 #[test]
238 fn two_pc_partition_participant() {
239 let builder = hydro_lang::FlowBuilder::new();
240 let coordinator = builder.process();
241 let participants = builder.cluster();
242 let clients = builder.cluster();
243 let client_aggregator = builder.process();
244
245 create_two_pc(&coordinator, &participants, &clients, &client_aggregator);
246
247 let mut cycle_data = HashMap::new();
248 let built = builder
249 .optimize_with(persist_pullup)
250 .optimize_with(inject_id)
251 .optimize_with(|ir| {
252 cycle_data = cycle_source_to_sink_input(ir);
253 inject_location(ir, &cycle_data);
254 })
255 .into_deploy::<HydroDeploy>();
256 let mut ir = deep_clone(built.ir());
257
258 let participant_partitioning =
259 partitioning_analysis(&mut ir, &participants.id(), &cycle_data);
260 let name_to_id = name_to_id_map(&mut ir);
262 let p_prepare_id = *name_to_id.get("p_prepare").unwrap();
263 let p_commits_id = *name_to_id.get("p_commits").unwrap();
264 let expected_participant_partitionings = vec![];
266 let expected_participant_input_parents = BTreeMap::from([
267 (p_prepare_id, p_prepare_id - 1),
268 (p_commits_id, p_commits_id - 1),
269 ]);
270 assert_eq!(
271 participant_partitioning,
272 Some((
273 expected_participant_partitionings,
274 expected_participant_input_parents
275 ))
276 );
277 let participant_nodes_to_partition = nodes_to_partition(participant_partitioning).unwrap();
278 let participant_partitioner = Partitioner {
279 nodes_to_partition: participant_nodes_to_partition,
280 num_partitions: 3,
281 location_id: participants.id().raw_id(),
282 new_cluster_id: None,
283 };
284 partition(&mut ir, &participant_partitioner);
285
286 hydro_build_utils::assert_debug_snapshot!(&ir);
287 }
288}