Skip to main content

hydro_test/cluster/
map_reduce.rs

1use hydro_lang::live_collections::stream::TotalOrder;
2use hydro_lang::prelude::*;
3
4pub struct Leader {}
5pub struct Worker {}
6
7pub fn map_reduce<'a>(flow: &mut FlowBuilder<'a>) -> (Process<'a, Leader>, Cluster<'a, Worker>) {
8    let process = flow.process();
9    let cluster = flow.cluster();
10
11    let words = process
12        .source_iter(q!(vec!["abc", "abc", "xyz", "abc"]))
13        .map(q!(|s| s.to_owned()));
14
15    let partitioned_words = words
16        .round_robin(&cluster, TCP.fail_stop().bincode(), nondet!(/** test */))
17        .map(q!(|string| (string, ())))
18        .into_keyed();
19
20    let batches = partitioned_words
21        .batch(
22            &cluster.tick(),
23            nondet!(/** addition is associative so we can batch reduce */),
24        )
25        .fold(q!(|| 0), q!(|count, _| *count += 1))
26        .entries()
27        .inspect(q!(|(string, count)| println!(
28            "partition count: {} - {}",
29            string, count
30        )))
31        .all_ticks()
32        .send(&process, TCP.fail_stop().bincode())
33        .values();
34
35    let reduced = batches.into_keyed().reduce(q!(
36        |total, count| *total += count,
37        commutative = manual_proof!(/** integer add is commutative */)
38    ));
39
40    reduced
41        .snapshot(&process.tick(), nondet!(/** intentional output */))
42        .entries()
43        .all_ticks()
44        .assume_ordering::<TotalOrder>(nondet!(/** unordered logs across keys are okay */))
45        .for_each(q!(|(string, count)| println!("{}: {}", string, count)));
46
47    (process, cluster)
48}
49
50#[cfg(test)]
51mod tests {
52    use hydro_lang::deploy::HydroDeploy;
53
54    #[test]
55    fn map_reduce_ir() {
56        let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
57        let _ = super::map_reduce(&mut builder);
58        let mut built = builder.with_default_optimize::<HydroDeploy>();
59
60        hydro_build_utils::assert_debug_snapshot!(built.ir());
61
62        for (location_key, ir) in built.preview_compile().all_dfir() {
63            hydro_build_utils::insta::with_settings!({
64                snapshot_suffix => format!("surface_graph_{location_key}")
65            }, {
66                hydro_build_utils::assert_snapshot!(ir.surface_syntax_string());
67            });
68        }
69    }
70}