hydro_test/cluster/
compute_pi.rs1use std::time::Duration;
2
3use hydro_lang::*;
4
5pub struct Worker {}
6pub struct Leader {}
7
8pub fn compute_pi<'a>(
9 flow: &FlowBuilder<'a>,
10 batch_size: usize,
11) -> (Cluster<'a, Worker>, Process<'a, Leader>) {
12 let cluster = flow.cluster();
13 let process = flow.process();
14
15 let trials = cluster
16 .tick()
17 .spin_batch(q!(batch_size))
18 .map(q!(|_| rand::random::<(f64, f64)>()))
19 .map(q!(|(x, y)| x * x + y * y < 1.0))
20 .fold(
21 q!(|| (0u64, 0u64)),
22 q!(|(inside, total), sample_inside| {
23 if sample_inside {
24 *inside += 1;
25 }
26
27 *total += 1;
28 }),
29 )
30 .all_ticks();
31
32 let estimate = trials
33 .send_bincode(&process)
34 .values()
35 .reduce_commutative(q!(|(inside, total), (inside_batch, total_batch)| {
36 *inside += inside_batch;
37 *total += total_batch;
38 }));
39
40 estimate
41 .sample_every(
42 q!(Duration::from_secs(1)),
43 nondet!(),
44 )
45 .for_each(q!(|(inside, total)| {
46 println!(
47 "pi: {} ({} trials)",
48 4.0 * inside as f64 / total as f64,
49 total
50 );
51 }));
52
53 (cluster, process)
54}
55
56#[cfg(test)]
57mod tests {
58 use hydro_lang::Location;
59 use hydro_lang::deploy::HydroDeploy;
60 use hydro_lang::rewrites::persist_pullup;
61 use hydro_optimize::decoupler;
62
63 struct DecoupledCluster {}
64
65 #[test]
66 fn compute_pi_ir() {
67 let builder = hydro_lang::FlowBuilder::new();
68 let _ = super::compute_pi(&builder, 8192);
69 let built = builder.with_default_optimize::<HydroDeploy>();
70
71 hydro_build_utils::assert_debug_snapshot!(built.ir());
72
73 for (id, ir) in built.preview_compile().all_dfir() {
74 hydro_build_utils::insta::with_settings!({
75 snapshot_suffix => format!("surface_graph_{id}"),
76 }, {
77 hydro_build_utils::assert_snapshot!(ir.surface_syntax_string());
78 });
79 }
80 }
81
82 #[test]
83 fn decoupled_compute_pi_ir() {
84 let builder = hydro_lang::FlowBuilder::new();
85 let (cluster, _) = super::compute_pi(&builder, 8192);
86 let decoupled_cluster = builder.cluster::<DecoupledCluster>();
87 let decoupler = decoupler::Decoupler {
88 output_to_decoupled_machine_after: vec![4],
89 output_to_original_machine_after: vec![],
90 place_on_decoupled_machine: vec![],
91 decoupled_location: decoupled_cluster.id().clone(),
92 orig_location: cluster.id().clone(),
93 };
94 let built = builder
95 .optimize_with(persist_pullup::persist_pullup)
96 .optimize_with(|roots| decoupler::decouple(roots, &decoupler))
97 .into_deploy::<HydroDeploy>();
98
99 hydro_build_utils::assert_debug_snapshot!(built.ir());
100
101 for (id, ir) in built.preview_compile().all_dfir() {
102 hydro_build_utils::insta::with_settings!({
103 snapshot_suffix => format!("surface_graph_{id}"),
104 }, {
105 hydro_build_utils::assert_snapshot!(ir.surface_syntax_string());
106 });
107 }
108 }
109}