hydro_test/cluster/
compute_pi.rs1use std::time::Duration;
2
3use hydro_lang::live_collections::stream::ExactlyOnce;
4use hydro_lang::prelude::*;
5
6pub enum Worker {}
7pub enum Leader {}
8
9pub fn compute_pi<'a>(
10 flow: &mut FlowBuilder<'a>,
11 batch_size: usize,
12) -> (Cluster<'a, Worker>, Process<'a, Leader>) {
13 let cluster = flow.cluster();
14 let process = flow.process();
15
16 let trials = cluster
17 .tick()
18 .spin_batch(q!(batch_size))
19 .map(q!(|_| rand::random::<(f64, f64)>()))
20 .map(q!(|(x, y)| x * x + y * y < 1.0))
21 .fold(
22 q!(|| (0u64, 0u64)),
23 q!(|(inside, total), sample_inside| {
24 if sample_inside {
25 *inside += 1;
26 }
27
28 *total += 1;
29 }),
30 )
31 .all_ticks();
32
33 let estimate = trials
34 .send(&process, TCP.fail_stop().bincode())
35 .values()
36 .reduce(q!(
37 |(inside, total), (inside_batch, total_batch)| {
38 *inside += inside_batch;
39 *total += total_batch;
40 },
41 commutative = manual_proof!()
42 ));
43
44 estimate
45 .sample_every(
46 q!(Duration::from_secs(1)),
47 nondet!(),
48 )
49 .assume_retries::<ExactlyOnce>(nondet!())
50 .for_each(q!(|(inside, total)| {
51 println!(
52 "pi: {} ({} trials)",
53 4.0 * inside as f64 / total as f64,
54 total
55 );
56 }));
57
58 (cluster, process)
59}
60
61#[cfg(test)]
62mod tests {
63 use hydro_lang::deploy::HydroDeploy;
64
65 #[test]
66 fn compute_pi_ir() {
67 let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
68 let _ = super::compute_pi(&mut builder, 8192);
69 let mut built = builder.with_default_optimize::<HydroDeploy>();
70
71 hydro_build_utils::assert_debug_snapshot!(built.ir());
72
73 for (location_key, ir) in built.preview_compile().all_dfir() {
74 hydro_build_utils::insta::with_settings!({
75 snapshot_suffix => format!("surface_graph_{location_key}"),
76 }, {
77 hydro_build_utils::assert_snapshot!(ir.surface_syntax_string());
78 });
79 }
80 }
81}