hydro_test/cluster/
map_reduce.rs1use hydro_lang::live_collections::stream::TotalOrder;
2use hydro_lang::prelude::*;
3
4pub struct Leader {}
5pub struct Worker {}
6
7pub fn map_reduce<'a>(flow: &mut FlowBuilder<'a>) -> (Process<'a, Leader>, Cluster<'a, Worker>) {
8 let process = flow.process();
9 let cluster = flow.cluster();
10
11 let words = process
12 .source_iter(q!(vec!["abc", "abc", "xyz", "abc"]))
13 .map(q!(|s| s.to_owned()));
14
15 let partitioned_words = words
16 .round_robin(&cluster, TCP.fail_stop().bincode(), nondet!())
17 .map(q!(|string| (string, ())))
18 .into_keyed();
19
20 let batches = partitioned_words
21 .batch(
22 &cluster.tick(),
23 nondet!(),
24 )
25 .fold(q!(|| 0), q!(|count, _| *count += 1))
26 .entries()
27 .inspect(q!(|(string, count)| println!(
28 "partition count: {} - {}",
29 string, count
30 )))
31 .all_ticks()
32 .send(&process, TCP.fail_stop().bincode())
33 .values();
34
35 let reduced = batches.into_keyed().reduce(q!(
36 |total, count| *total += count,
37 commutative = manual_proof!()
38 ));
39
40 reduced
41 .snapshot(&process.tick(), nondet!())
42 .entries()
43 .all_ticks()
44 .assume_ordering::<TotalOrder>(nondet!())
45 .for_each(q!(|(string, count)| println!("{}: {}", string, count)));
46
47 (process, cluster)
48}
49
50#[cfg(test)]
51mod tests {
52 use hydro_lang::deploy::HydroDeploy;
53
54 #[test]
55 fn map_reduce_ir() {
56 let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
57 let _ = super::map_reduce(&mut builder);
58 let mut built = builder.with_default_optimize::<HydroDeploy>();
59
60 hydro_build_utils::assert_debug_snapshot!(built.ir());
61
62 for (location_key, ir) in built.preview_compile().all_dfir() {
63 hydro_build_utils::insta::with_settings!({
64 snapshot_suffix => format!("surface_graph_{location_key}")
65 }, {
66 hydro_build_utils::assert_snapshot!(ir.surface_syntax_string());
67 });
68 }
69 }
70}