hydro_test/cluster/
compute_pi.rs

1use std::time::Duration;
2
3use hydro_lang::*;
4
5pub struct Worker {}
6pub struct Leader {}
7
8pub fn compute_pi<'a>(
9    flow: &FlowBuilder<'a>,
10    batch_size: usize,
11) -> (Cluster<'a, Worker>, Process<'a, Leader>) {
12    let cluster = flow.cluster();
13    let process = flow.process();
14
15    let trials = cluster
16        .tick()
17        .spin_batch(q!(batch_size))
18        .map(q!(|_| rand::random::<(f64, f64)>()))
19        .map(q!(|(x, y)| x * x + y * y < 1.0))
20        .fold(
21            q!(|| (0u64, 0u64)),
22            q!(|(inside, total), sample_inside| {
23                if sample_inside {
24                    *inside += 1;
25                }
26
27                *total += 1;
28            }),
29        )
30        .all_ticks();
31
32    let estimate = trials
33        .send_bincode(&process)
34        .values()
35        .reduce_commutative(q!(|(inside, total), (inside_batch, total_batch)| {
36            *inside += inside_batch;
37            *total += total_batch;
38        }));
39
40    estimate
41        .sample_every(
42            q!(Duration::from_secs(1)),
43            nondet!(/** intentional output */),
44        )
45        .for_each(q!(|(inside, total)| {
46            println!(
47                "pi: {} ({} trials)",
48                4.0 * inside as f64 / total as f64,
49                total
50            );
51        }));
52
53    (cluster, process)
54}
55
56#[cfg(test)]
57mod tests {
58    use hydro_lang::Location;
59    use hydro_lang::deploy::HydroDeploy;
60    use hydro_lang::rewrites::persist_pullup;
61    use hydro_optimize::decoupler;
62
63    struct DecoupledCluster {}
64
65    #[test]
66    fn compute_pi_ir() {
67        let builder = hydro_lang::FlowBuilder::new();
68        let _ = super::compute_pi(&builder, 8192);
69        let built = builder.with_default_optimize::<HydroDeploy>();
70
71        hydro_build_utils::assert_debug_snapshot!(built.ir());
72
73        for (id, ir) in built.preview_compile().all_dfir() {
74            hydro_build_utils::insta::with_settings!({
75                snapshot_suffix => format!("surface_graph_{id}"),
76            }, {
77                hydro_build_utils::assert_snapshot!(ir.surface_syntax_string());
78            });
79        }
80    }
81
82    #[test]
83    fn decoupled_compute_pi_ir() {
84        let builder = hydro_lang::FlowBuilder::new();
85        let (cluster, _) = super::compute_pi(&builder, 8192);
86        let decoupled_cluster = builder.cluster::<DecoupledCluster>();
87        let decoupler = decoupler::Decoupler {
88            output_to_decoupled_machine_after: vec![4],
89            output_to_original_machine_after: vec![],
90            place_on_decoupled_machine: vec![],
91            decoupled_location: decoupled_cluster.id().clone(),
92            orig_location: cluster.id().clone(),
93        };
94        let built = builder
95            .optimize_with(persist_pullup::persist_pullup)
96            .optimize_with(|roots| decoupler::decouple(roots, &decoupler))
97            .into_deploy::<HydroDeploy>();
98
99        hydro_build_utils::assert_debug_snapshot!(built.ir());
100
101        for (id, ir) in built.preview_compile().all_dfir() {
102            hydro_build_utils::insta::with_settings!({
103                snapshot_suffix => format!("surface_graph_{id}"),
104            }, {
105                hydro_build_utils::assert_snapshot!(ir.surface_syntax_string());
106            });
107        }
108    }
109}