hydro_test/cluster/
map_reduce.rs

1use hydro_lang::*;
2
3pub struct Leader {}
4pub struct Worker {}
5
6pub fn map_reduce<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, Leader>, Cluster<'a, Worker>) {
7    let process = flow.process();
8    let cluster = flow.cluster();
9
10    let words = process
11        .source_iter(q!(vec!["abc", "abc", "xyz", "abc"]))
12        .map(q!(|s| s.to_string()));
13
14    let partitioned_words = words
15        .round_robin_bincode(&cluster, nondet!(/** test */))
16        .map(q!(|string| (string, ())))
17        .into_keyed();
18
19    let batches = partitioned_words
20        .batch(
21            &cluster.tick(),
22            nondet!(/** addition is associative so we can batch reduce */),
23        )
24        .fold(q!(|| 0), q!(|count, _| *count += 1))
25        .entries()
26        .inspect(q!(|(string, count)| println!(
27            "partition count: {} - {}",
28            string, count
29        )))
30        .all_ticks()
31        .send_bincode(&process)
32        .values();
33
34    let reduced = batches
35        .into_keyed()
36        .reduce_commutative(q!(|total, count| *total += count));
37
38    reduced
39        .snapshot(&process.tick(), nondet!(/** intentional output */))
40        .entries()
41        .all_ticks()
42        .for_each(q!(|(string, count)| println!("{}: {}", string, count)));
43
44    (process, cluster)
45}
46
47#[cfg(test)]
48mod tests {
49    use hydro_lang::deploy::HydroDeploy;
50
51    #[test]
52    fn map_reduce_ir() {
53        let builder = hydro_lang::FlowBuilder::new();
54        let _ = super::map_reduce(&builder);
55        let built = builder.with_default_optimize::<HydroDeploy>();
56
57        hydro_build_utils::assert_debug_snapshot!(built.ir());
58
59        for (id, ir) in built.preview_compile().all_dfir() {
60            hydro_build_utils::insta::with_settings!({
61                snapshot_suffix => format!("surface_graph_{id}")
62            }, {
63                hydro_build_utils::assert_snapshot!(ir.surface_syntax_string());
64            });
65        }
66    }
67}