hydro_test/cluster/
map_reduce.rs1use hydro_lang::*;
2
3pub struct Leader {}
4pub struct Worker {}
5
6pub fn map_reduce<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, Leader>, Cluster<'a, Worker>) {
7 let process = flow.process();
8 let cluster = flow.cluster();
9
10 let words = process
11 .source_iter(q!(vec!["abc", "abc", "xyz", "abc"]))
12 .map(q!(|s| s.to_string()));
13
14 let partitioned_words = words
15 .round_robin_bincode(&cluster, nondet!())
16 .map(q!(|string| (string, ())))
17 .into_keyed();
18
19 let batches = partitioned_words
20 .batch(
21 &cluster.tick(),
22 nondet!(),
23 )
24 .fold(q!(|| 0), q!(|count, _| *count += 1))
25 .entries()
26 .inspect(q!(|(string, count)| println!(
27 "partition count: {} - {}",
28 string, count
29 )))
30 .all_ticks()
31 .send_bincode(&process)
32 .values();
33
34 let reduced = batches
35 .into_keyed()
36 .reduce_commutative(q!(|total, count| *total += count));
37
38 reduced
39 .snapshot(&process.tick(), nondet!())
40 .entries()
41 .all_ticks()
42 .for_each(q!(|(string, count)| println!("{}: {}", string, count)));
43
44 (process, cluster)
45}
46
47#[cfg(test)]
48mod tests {
49 use hydro_lang::deploy::HydroDeploy;
50
51 #[test]
52 fn map_reduce_ir() {
53 let builder = hydro_lang::FlowBuilder::new();
54 let _ = super::map_reduce(&builder);
55 let built = builder.with_default_optimize::<HydroDeploy>();
56
57 hydro_build_utils::assert_debug_snapshot!(built.ir());
58
59 for (id, ir) in built.preview_compile().all_dfir() {
60 hydro_build_utils::insta::with_settings!({
61 snapshot_suffix => format!("surface_graph_{id}")
62 }, {
63 hydro_build_utils::assert_snapshot!(ir.surface_syntax_string());
64 });
65 }
66 }
67}