1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
use hydro_lang::*;

pub struct Leader {}
pub struct Worker {}

pub fn map_reduce<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, Leader>, Cluster<'a, Worker>) {
    let process = flow.process();
    let cluster = flow.cluster();

    let words = process
        .source_iter(q!(vec!["abc", "abc", "xyz", "abc"]))
        .map(q!(|s| s.to_string()));

    let partitioned_words = words
        .round_robin_bincode(&cluster)
        .map(q!(|string| (string, ())));

    let batches = unsafe {
        // SAFETY: addition is associative so we can batch reduce
        partitioned_words.timestamped(&cluster.tick()).tick_batch()
    }
    .fold_keyed(q!(|| 0), q!(|count, _| *count += 1))
    .inspect(q!(|(string, count)| println!(
        "partition count: {} - {}",
        string, count
    )))
    .all_ticks()
    .send_bincode_interleaved(&process);

    unsafe {
        // SAFETY: addition is associative so we can batch reduce
        batches
            .timestamped(&process.tick())
            .tick_batch()
            .persist()
            .reduce_keyed_commutative(q!(|total, count| *total += count))
    }
    .all_ticks()
    .for_each(q!(|(string, count)| println!("{}: {}", string, count)));

    (process, cluster)
}

#[cfg(test)]
mod tests {
    use hydro_lang::deploy::DeployRuntime;
    use stageleft::RuntimeData;

    #[test]
    fn map_reduce_ir() {
        let builder = hydro_lang::FlowBuilder::new();
        let _ = super::map_reduce(&builder);
        let built = builder.with_default_optimize::<DeployRuntime>();

        insta::assert_debug_snapshot!(built.ir());

        for (id, ir) in built.compile(&RuntimeData::new("FAKE")).hydroflow_ir() {
            insta::with_settings!({snapshot_suffix => format!("surface_graph_{id}")}, {
                insta::assert_snapshot!(ir.surface_syntax_string());
            });
        }
    }
}