hydro_test/cluster/
two_pc_bench.rs

1use hydro_lang::*;
2use hydro_std::bench_client::{bench_client, print_bench_results};
3
4use super::two_pc::{Coordinator, Participant};
5use crate::cluster::paxos_bench::inc_u32_workload_generator;
6use crate::cluster::two_pc::two_pc;
7
8pub struct Client;
9pub struct Aggregator;
10
11pub fn two_pc_bench<'a>(
12    num_clients_per_node: usize,
13    coordinator: &Process<'a, Coordinator>,
14    participants: &Cluster<'a, Participant>,
15    num_participants: usize,
16    clients: &Cluster<'a, Client>,
17    client_aggregator: &Process<'a, Aggregator>,
18) {
19    let bench_results = bench_client(
20        clients,
21        inc_u32_workload_generator,
22        |payloads| {
23            // Send committed requests back to the original client
24            two_pc(
25                coordinator,
26                participants,
27                num_participants,
28                payloads.send_bincode(coordinator).entries(),
29            )
30            .demux_bincode(clients)
31        },
32        num_clients_per_node,
33        nondet!(/** bench */),
34    );
35
36    print_bench_results(bench_results, client_aggregator, clients);
37}
38
39#[cfg(test)]
40mod tests {
41    use std::collections::{BTreeMap, HashMap};
42
43    use dfir_lang::graph::WriteConfig;
44    use hydro_deploy::Deployment;
45    use hydro_lang::Location;
46    use hydro_lang::deploy::{DeployCrateWrapper, HydroDeploy, TrybuildHost};
47    use hydro_lang::ir::deep_clone;
48    use hydro_lang::rewrites::persist_pullup::persist_pullup;
49    #[cfg(stageleft_runtime)]
50    use hydro_lang::{Cluster, Process};
51    use hydro_optimize::debug::name_to_id_map;
52    use hydro_optimize::partition_node_analysis::{nodes_to_partition, partitioning_analysis};
53    use hydro_optimize::partitioner::{Partitioner, partition};
54    use hydro_optimize::repair::{cycle_source_to_sink_input, inject_id, inject_location};
55
56    #[cfg(stageleft_runtime)]
57    use crate::cluster::{
58        two_pc::{Coordinator, Participant},
59        two_pc_bench::{Aggregator, Client},
60    };
61
62    const NUM_PARTICIPANTS: usize = 3;
63
64    #[cfg(stageleft_runtime)]
65    fn create_two_pc<'a>(
66        coordinator: &Process<'a, Coordinator>,
67        participants: &Cluster<'a, Participant>,
68        clients: &Cluster<'a, Client>,
69        client_aggregator: &Process<'a, Aggregator>,
70    ) {
71        super::two_pc_bench(
72            100,
73            coordinator,
74            participants,
75            NUM_PARTICIPANTS,
76            clients,
77            client_aggregator,
78        );
79    }
80
81    #[test]
82    fn two_pc_ir() {
83        let builder = hydro_lang::FlowBuilder::new();
84        let coordinator = builder.process();
85        let participants = builder.cluster();
86        let clients = builder.cluster();
87        let client_aggregator = builder.process();
88
89        create_two_pc(&coordinator, &participants, &clients, &client_aggregator);
90        let built = builder.with_default_optimize::<HydroDeploy>();
91
92        hydro_lang::ir::dbg_dedup_tee(|| {
93            hydro_build_utils::assert_debug_snapshot!(built.ir());
94        });
95
96        let preview = built.preview_compile();
97        hydro_build_utils::insta::with_settings!({
98            snapshot_suffix => "coordinator_mermaid"
99        }, {
100            hydro_build_utils::assert_snapshot!(
101                preview.dfir_for(&coordinator).to_mermaid(&WriteConfig {
102                    no_subgraphs: true,
103                    no_pull_push: true,
104                    no_handoffs: true,
105                    op_text_no_imports: true,
106                    ..WriteConfig::default()
107                })
108            );
109        });
110
111        let preview = built.preview_compile();
112        hydro_build_utils::insta::with_settings!({
113            snapshot_suffix => "participants_mermaid"
114        }, {
115            hydro_build_utils::assert_snapshot!(
116                preview.dfir_for(&participants).to_mermaid(&WriteConfig {
117                    no_subgraphs: true,
118                    no_pull_push: true,
119                    no_handoffs: true,
120                    op_text_no_imports: true,
121                    ..WriteConfig::default()
122                })
123            );
124        });
125    }
126
127    #[tokio::test]
128    async fn two_pc_some_throughput() {
129        let builder = hydro_lang::FlowBuilder::new();
130        let coordinator = builder.process();
131        let participants = builder.cluster();
132        let clients = builder.cluster();
133        let client_aggregator = builder.process();
134
135        create_two_pc(&coordinator, &participants, &clients, &client_aggregator);
136        let mut deployment = Deployment::new();
137
138        let nodes = builder
139            .with_process(&coordinator, TrybuildHost::new(deployment.Localhost()))
140            .with_cluster(
141                &participants,
142                (0..NUM_PARTICIPANTS).map(|_| TrybuildHost::new(deployment.Localhost())),
143            )
144            .with_cluster(&clients, vec![TrybuildHost::new(deployment.Localhost())])
145            .with_process(
146                &client_aggregator,
147                TrybuildHost::new(deployment.Localhost()),
148            )
149            .deploy(&mut deployment);
150
151        deployment.deploy().await.unwrap();
152
153        let client_node = &nodes.get_process(&client_aggregator);
154        let client_out = client_node.stdout_filter("Throughput:").await;
155
156        deployment.start().await.unwrap();
157
158        use std::str::FromStr;
159
160        use regex::Regex;
161
162        let re = Regex::new(r"Throughput: ([^ ]+) - ([^ ]+) - ([^ ]+) requests/s").unwrap();
163        let mut found = 0;
164        let mut client_out = client_out;
165        while let Some(line) = client_out.recv().await {
166            if let Some(caps) = re.captures(&line)
167                && let Ok(lower) = f64::from_str(&caps[1])
168                && 0.0 < lower
169            {
170                println!("Found throughput lower-bound: {}", lower);
171                found += 1;
172                if found == 2 {
173                    break;
174                }
175            }
176        }
177    }
178
179    #[test]
180    fn two_pc_partition_coordinator() {
181        let builder = hydro_lang::FlowBuilder::new();
182        let coordinator = builder.process();
183        let partitioned_coordinator = builder.cluster::<()>();
184        let participants = builder.cluster();
185        let clients = builder.cluster();
186        let client_aggregator = builder.process();
187
188        create_two_pc(&coordinator, &participants, &clients, &client_aggregator);
189
190        let mut cycle_data = HashMap::new();
191        let built = builder
192            .optimize_with(persist_pullup)
193            .optimize_with(inject_id)
194            .optimize_with(|ir| {
195                cycle_data = cycle_source_to_sink_input(ir);
196                inject_location(ir, &cycle_data);
197            })
198            .into_deploy::<HydroDeploy>();
199        let mut ir = deep_clone(built.ir());
200
201        // Coordinator
202        let coordinator_partitioning =
203            partitioning_analysis(&mut ir, &coordinator.id(), &cycle_data);
204        let name_to_id = name_to_id_map(&mut ir);
205        let c_prepare_id = *name_to_id.get("c_prepare").unwrap();
206        let c_votes_id = *name_to_id.get("c_votes").unwrap();
207        let c_commits_id = *name_to_id.get("c_commits").unwrap();
208        // 1 is the partitioning index of those inputs. Specifically, given the client sends (sender_id, payload) to the coordinator, we can partition on the entire payload
209        let expected_coordinator_partitioning = vec![BTreeMap::from([
210            (c_votes_id, vec!["1".to_string()]),
211            (c_commits_id, vec!["1".to_string()]),
212        ])];
213        let expected_coordinator_input_parents = BTreeMap::from([
214            (c_prepare_id, c_prepare_id - 1),
215            (c_votes_id, c_votes_id - 1),
216            (c_commits_id, c_commits_id - 1),
217        ]);
218        assert_eq!(
219            coordinator_partitioning,
220            Some((
221                expected_coordinator_partitioning,
222                expected_coordinator_input_parents
223            ))
224        );
225        let coordinator_nodes_to_partition = nodes_to_partition(coordinator_partitioning).unwrap();
226        let coordinator_partitioner = Partitioner {
227            nodes_to_partition: coordinator_nodes_to_partition,
228            num_partitions: 3,
229            location_id: coordinator.id().raw_id(),
230            new_cluster_id: Some(partitioned_coordinator.id().raw_id()),
231        };
232        partition(&mut ir, &coordinator_partitioner);
233
234        hydro_build_utils::assert_debug_snapshot!(&ir);
235    }
236
237    #[test]
238    fn two_pc_partition_participant() {
239        let builder = hydro_lang::FlowBuilder::new();
240        let coordinator = builder.process();
241        let participants = builder.cluster();
242        let clients = builder.cluster();
243        let client_aggregator = builder.process();
244
245        create_two_pc(&coordinator, &participants, &clients, &client_aggregator);
246
247        let mut cycle_data = HashMap::new();
248        let built = builder
249            .optimize_with(persist_pullup)
250            .optimize_with(inject_id)
251            .optimize_with(|ir| {
252                cycle_data = cycle_source_to_sink_input(ir);
253                inject_location(ir, &cycle_data);
254            })
255            .into_deploy::<HydroDeploy>();
256        let mut ir = deep_clone(built.ir());
257
258        let participant_partitioning =
259            partitioning_analysis(&mut ir, &participants.id(), &cycle_data);
260        // Recalculate node IDs since they've changed as well
261        let name_to_id = name_to_id_map(&mut ir);
262        let p_prepare_id = *name_to_id.get("p_prepare").unwrap();
263        let p_commits_id = *name_to_id.get("p_commits").unwrap();
264        // Participants can partition on ANYTHING, since they only execute maps
265        let expected_participant_partitionings = vec![];
266        let expected_participant_input_parents = BTreeMap::from([
267            (p_prepare_id, p_prepare_id - 1),
268            (p_commits_id, p_commits_id - 1),
269        ]);
270        assert_eq!(
271            participant_partitioning,
272            Some((
273                expected_participant_partitionings,
274                expected_participant_input_parents
275            ))
276        );
277        let participant_nodes_to_partition = nodes_to_partition(participant_partitioning).unwrap();
278        let participant_partitioner = Partitioner {
279            nodes_to_partition: participant_nodes_to_partition,
280            num_partitions: 3,
281            location_id: participants.id().raw_id(),
282            new_cluster_id: None,
283        };
284        partition(&mut ir, &participant_partitioner);
285
286        hydro_build_utils::assert_debug_snapshot!(&ir);
287    }
288}