hydro_test/cluster/
compute_pi.rs
1use std::time::Duration;
2
3use hydro_lang::*;
4
5pub struct Worker {}
6pub struct Leader {}
7
8pub fn compute_pi<'a>(
9 flow: &FlowBuilder<'a>,
10 batch_size: usize,
11) -> (Cluster<'a, Worker>, Process<'a, Leader>) {
12 let cluster = flow.cluster();
13 let process = flow.process();
14
15 let trials = cluster
16 .tick()
17 .spin_batch(q!(batch_size))
18 .map(q!(|_| rand::random::<(f64, f64)>()))
19 .map(q!(|(x, y)| x * x + y * y < 1.0))
20 .fold(
21 q!(|| (0u64, 0u64)),
22 q!(|(inside, total), sample_inside| {
23 if sample_inside {
24 *inside += 1;
25 }
26
27 *total += 1;
28 }),
29 )
30 .all_ticks();
31
32 let estimate = trials
33 .send_bincode_anonymous(&process)
34 .reduce_commutative(q!(|(inside, total), (inside_batch, total_batch)| {
35 *inside += inside_batch;
36 *total += total_batch;
37 }));
38
39 unsafe {
40 estimate.sample_every(q!(Duration::from_secs(1)))
42 }
43 .for_each(q!(|(inside, total)| {
44 println!(
45 "pi: {} ({} trials)",
46 4.0 * inside as f64 / total as f64,
47 total
48 );
49 }));
50
51 (cluster, process)
52}
53
54#[cfg(test)]
55mod tests {
56 use hydro_lang::Location;
57 use hydro_lang::deploy::DeployRuntime;
58 use hydro_lang::rewrites::{decoupler, persist_pullup};
59 use stageleft::RuntimeData;
60
61 struct DecoupledCluster {}
62
63 #[test]
64 fn compute_pi_ir() {
65 let builder = hydro_lang::FlowBuilder::new();
66 let _ = super::compute_pi(&builder, 8192);
67 let built = builder.with_default_optimize::<DeployRuntime>();
68
69 insta::assert_debug_snapshot!(built.ir());
70
71 for (id, ir) in built.compile(&RuntimeData::new("FAKE")).all_dfir() {
72 insta::with_settings!({snapshot_suffix => format!("surface_graph_{id}")}, {
73 insta::assert_snapshot!(ir.surface_syntax_string());
74 });
75 }
76 }
77
78 #[test]
79 fn decoupled_compute_pi_ir() {
80 let builder = hydro_lang::FlowBuilder::new();
81 let _ = super::compute_pi(&builder, 8192);
82 let decoupled_cluster = builder.cluster::<DecoupledCluster>();
83 let decoupler = decoupler::Decoupler {
84 nodes_to_decouple: vec![4],
85 new_location: decoupled_cluster.id().clone(),
86 };
87 let built = builder
88 .optimize_with(persist_pullup::persist_pullup)
89 .optimize_with(|leaves| decoupler::decouple(leaves, &decoupler))
90 .into_deploy::<DeployRuntime>();
91
92 insta::assert_debug_snapshot!(built.ir());
93
94 for (id, ir) in built.compile(&RuntimeData::new("FAKE")).all_dfir() {
95 insta::with_settings!({snapshot_suffix => format!("surface_graph_{id}")}, {
96 insta::assert_snapshot!(ir.surface_syntax_string());
97 });
98 }
99 }
100}