hydro_test/cluster/
map_reduce.rs
1use hydro_lang::*;
2
3pub struct Leader {}
4pub struct Worker {}
5
6pub fn map_reduce<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, Leader>, Cluster<'a, Worker>) {
7 let process = flow.process();
8 let cluster = flow.cluster();
9
10 let words = process
11 .source_iter(q!(vec!["abc", "abc", "xyz", "abc"]))
12 .map(q!(|s| s.to_string()));
13
14 let partitioned_words = words
15 .round_robin_bincode(&cluster)
16 .map(q!(|string| (string, ())));
17
18 let batches = unsafe {
19 partitioned_words.tick_batch(&cluster.tick())
21 }
22 .fold_keyed(q!(|| 0), q!(|count, _| *count += 1))
23 .inspect(q!(|(string, count)| println!(
24 "partition count: {} - {}",
25 string, count
26 )))
27 .all_ticks()
28 .send_bincode_anonymous(&process);
29
30 unsafe {
31 batches
33 .tick_batch(&process.tick())
34 .persist()
35 .reduce_keyed_commutative(q!(|total, count| *total += count))
36 }
37 .all_ticks()
38 .for_each(q!(|(string, count)| println!("{}: {}", string, count)));
39
40 (process, cluster)
41}
42
43#[cfg(test)]
44mod tests {
45 use hydro_lang::deploy::DeployRuntime;
46 use stageleft::RuntimeData;
47
48 #[test]
49 fn map_reduce_ir() {
50 let builder = hydro_lang::FlowBuilder::new();
51 let _ = super::map_reduce(&builder);
52 let built = builder.with_default_optimize::<DeployRuntime>();
53
54 insta::assert_debug_snapshot!(built.ir());
55
56 for (id, ir) in built.compile(&RuntimeData::new("FAKE")).all_dfir() {
57 insta::with_settings!({snapshot_suffix => format!("surface_graph_{id}")}, {
58 insta::assert_snapshot!(ir.surface_syntax_string());
59 });
60 }
61 }
62}