hydro_test/cluster/
many_to_many.rs
1use hydro_lang::*;
2
3pub fn many_to_many<'a>(flow: &FlowBuilder<'a>) -> Cluster<'a, ()> {
4 let cluster = flow.cluster();
5 cluster
6 .source_iter(q!(0..2))
7 .broadcast_bincode(&cluster)
8 .for_each(q!(|n| println!("cluster received: {:?}", n)));
9
10 cluster
11}
12
13#[cfg(test)]
14mod tests {
15 use hydro_deploy::Deployment;
16 use hydro_lang::deploy::DeployCrateWrapper;
17
18 #[tokio::test]
19 async fn many_to_many() {
20 let mut deployment = Deployment::new();
21
22 let builder = hydro_lang::FlowBuilder::new();
23 let cluster = super::many_to_many(&builder);
24 let built = builder.with_default_optimize();
25
26 insta::assert_debug_snapshot!(built.ir());
27
28 let nodes = built
29 .with_cluster(&cluster, (0..2).map(|_| deployment.Localhost()))
30 .deploy(&mut deployment);
31
32 deployment.deploy().await.unwrap();
33
34 let cluster_stdouts = futures::future::join_all(
35 nodes
36 .get_cluster(&cluster)
37 .members()
38 .iter()
39 .map(|node| node.stdout()),
40 )
41 .await;
42
43 deployment.start().await.unwrap();
44
45 for mut node_stdout in cluster_stdouts {
46 let mut node_outs = vec![];
47 for _i in 0..4 {
48 node_outs.push(node_stdout.recv().await.unwrap());
49 }
50 node_outs.sort();
51
52 let mut node_outs = node_outs.into_iter();
53
54 for sender in 0..2 {
55 for value in 0..2 {
56 assert_eq!(
57 node_outs.next().unwrap(),
58 format!("cluster received: (ClusterId::<()>({}), {})", sender, value)
59 );
60 }
61 }
62 }
63 }
64}