hydro_test/cluster/
compute_pi.rs

1use std::time::Duration;
2
3use hydro_lang::*;
4
5pub struct Worker {}
6pub struct Leader {}
7
8pub fn compute_pi<'a>(
9    flow: &FlowBuilder<'a>,
10    batch_size: usize,
11) -> (Cluster<'a, Worker>, Process<'a, Leader>) {
12    let cluster = flow.cluster();
13    let process = flow.process();
14
15    let trials = cluster
16        .tick()
17        .spin_batch(q!(batch_size))
18        .map(q!(|_| rand::random::<(f64, f64)>()))
19        .map(q!(|(x, y)| x * x + y * y < 1.0))
20        .fold(
21            q!(|| (0u64, 0u64)),
22            q!(|(inside, total), sample_inside| {
23                if sample_inside {
24                    *inside += 1;
25                }
26
27                *total += 1;
28            }),
29        )
30        .all_ticks();
31
32    let estimate = trials
33        .send_bincode_anonymous(&process)
34        .reduce_commutative(q!(|(inside, total), (inside_batch, total_batch)| {
35            *inside += inside_batch;
36            *total += total_batch;
37        }));
38
39    unsafe {
40        // SAFETY: intentional non-determinism
41        estimate.sample_every(q!(Duration::from_secs(1)))
42    }
43    .for_each(q!(|(inside, total)| {
44        println!(
45            "pi: {} ({} trials)",
46            4.0 * inside as f64 / total as f64,
47            total
48        );
49    }));
50
51    (cluster, process)
52}
53
54#[cfg(test)]
55mod tests {
56    use hydro_lang::Location;
57    use hydro_lang::deploy::DeployRuntime;
58    use hydro_lang::rewrites::{decoupler, persist_pullup};
59    use stageleft::RuntimeData;
60
61    struct DecoupledCluster {}
62
63    #[test]
64    fn compute_pi_ir() {
65        let builder = hydro_lang::FlowBuilder::new();
66        let _ = super::compute_pi(&builder, 8192);
67        let built = builder.with_default_optimize::<DeployRuntime>();
68
69        insta::assert_debug_snapshot!(built.ir());
70
71        for (id, ir) in built.compile(&RuntimeData::new("FAKE")).all_dfir() {
72            insta::with_settings!({snapshot_suffix => format!("surface_graph_{id}")}, {
73                insta::assert_snapshot!(ir.surface_syntax_string());
74            });
75        }
76    }
77
78    #[test]
79    fn decoupled_compute_pi_ir() {
80        let builder = hydro_lang::FlowBuilder::new();
81        let _ = super::compute_pi(&builder, 8192);
82        let decoupled_cluster = builder.cluster::<DecoupledCluster>();
83        let decoupler = decoupler::Decoupler {
84            nodes_to_decouple: vec![4],
85            new_location: decoupled_cluster.id().clone(),
86        };
87        let built = builder
88            .optimize_with(persist_pullup::persist_pullup)
89            .optimize_with(|leaves| decoupler::decouple(leaves, &decoupler))
90            .into_deploy::<DeployRuntime>();
91
92        insta::assert_debug_snapshot!(built.ir());
93
94        for (id, ir) in built.compile(&RuntimeData::new("FAKE")).all_dfir() {
95            insta::with_settings!({snapshot_suffix => format!("surface_graph_{id}")}, {
96                insta::assert_snapshot!(ir.surface_syntax_string());
97            });
98        }
99    }
100}