hydro_test/cluster/
map_reduce.rs

1use hydro_lang::*;
2
3pub struct Leader {}
4pub struct Worker {}
5
6pub fn map_reduce<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, Leader>, Cluster<'a, Worker>) {
7    let process = flow.process();
8    let cluster = flow.cluster();
9
10    let words = process
11        .source_iter(q!(vec!["abc", "abc", "xyz", "abc"]))
12        .map(q!(|s| s.to_string()));
13
14    let partitioned_words = words
15        .round_robin_bincode(&cluster)
16        .map(q!(|string| (string, ())));
17
18    let batches = unsafe {
19        // SAFETY: addition is associative so we can batch reduce
20        partitioned_words.tick_batch(&cluster.tick())
21    }
22    .fold_keyed(q!(|| 0), q!(|count, _| *count += 1))
23    .inspect(q!(|(string, count)| println!(
24        "partition count: {} - {}",
25        string, count
26    )))
27    .all_ticks()
28    .send_bincode_anonymous(&process);
29
30    unsafe {
31        // SAFETY: addition is associative so we can batch reduce
32        batches
33            .tick_batch(&process.tick())
34            .persist()
35            .reduce_keyed_commutative(q!(|total, count| *total += count))
36    }
37    .all_ticks()
38    .for_each(q!(|(string, count)| println!("{}: {}", string, count)));
39
40    (process, cluster)
41}
42
43#[cfg(test)]
44mod tests {
45    use hydro_lang::deploy::DeployRuntime;
46    use stageleft::RuntimeData;
47
48    #[test]
49    fn map_reduce_ir() {
50        let builder = hydro_lang::FlowBuilder::new();
51        let _ = super::map_reduce(&builder);
52        let built = builder.with_default_optimize::<DeployRuntime>();
53
54        insta::assert_debug_snapshot!(built.ir());
55
56        for (id, ir) in built.compile(&RuntimeData::new("FAKE")).all_dfir() {
57            insta::with_settings!({snapshot_suffix => format!("surface_graph_{id}")}, {
58                insta::assert_snapshot!(ir.surface_syntax_string());
59            });
60        }
61    }
62}