Skip to main content

hydro_test/cluster/
compute_pi.rs

1use std::time::Duration;
2
3use hydro_lang::live_collections::stream::ExactlyOnce;
4use hydro_lang::prelude::*;
5
6pub enum Worker {}
7pub enum Leader {}
8
9pub fn compute_pi<'a>(
10    flow: &mut FlowBuilder<'a>,
11    batch_size: usize,
12) -> (Cluster<'a, Worker>, Process<'a, Leader>) {
13    let cluster = flow.cluster();
14    let process = flow.process();
15
16    let trials = cluster
17        .tick()
18        .spin_batch(q!(batch_size))
19        .map(q!(|_| rand::random::<(f64, f64)>()))
20        .map(q!(|(x, y)| x * x + y * y < 1.0))
21        .fold(
22            q!(|| (0u64, 0u64)),
23            q!(|(inside, total), sample_inside| {
24                if sample_inside {
25                    *inside += 1;
26                }
27
28                *total += 1;
29            }),
30        )
31        .all_ticks();
32
33    let estimate = trials
34        .send(&process, TCP.fail_stop().bincode())
35        .values()
36        .reduce(q!(
37            |(inside, total), (inside_batch, total_batch)| {
38                *inside += inside_batch;
39                *total += total_batch;
40            },
41            commutative = manual_proof!(/** int addition is commutative */)
42        ));
43
44    estimate
45        .sample_every(
46            q!(Duration::from_secs(1)),
47            nondet!(/** intentional output */),
48        )
49        .assume_retries::<ExactlyOnce>(nondet!(/** extra logs due to duplicate samples are okay */))
50        .for_each(q!(|(inside, total)| {
51            println!(
52                "pi: {} ({} trials)",
53                4.0 * inside as f64 / total as f64,
54                total
55            );
56        }));
57
58    (cluster, process)
59}
60
61#[cfg(test)]
62mod tests {
63    use hydro_lang::deploy::HydroDeploy;
64
65    #[test]
66    fn compute_pi_ir() {
67        let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
68        let _ = super::compute_pi(&mut builder, 8192);
69        let mut built = builder.with_default_optimize::<HydroDeploy>();
70
71        hydro_build_utils::assert_debug_snapshot!(built.ir());
72
73        for (location_key, ir) in built.preview_compile().all_dfir() {
74            hydro_build_utils::insta::with_settings!({
75                snapshot_suffix => format!("surface_graph_{location_key}"),
76            }, {
77                hydro_build_utils::assert_snapshot!(ir.surface_syntax_string());
78            });
79        }
80    }
81}