hydro_test/cluster/
many_to_many.rs1use hydro_lang::*;
2
3pub fn many_to_many<'a>(flow: &FlowBuilder<'a>) -> Cluster<'a, ()> {
4 let cluster = flow.cluster();
5 cluster
6 .source_iter(q!(0..2))
7 .broadcast_bincode(&cluster, nondet!())
8 .entries()
9 .for_each(q!(|n| println!("cluster received: {:?}", n)));
10
11 cluster
12}
13
14#[cfg(test)]
15mod tests {
16 use hydro_deploy::Deployment;
17 use hydro_lang::deploy::DeployCrateWrapper;
18
19 #[test]
20 fn many_to_many_ir() {
21 let builder = hydro_lang::FlowBuilder::new();
22 let _ = super::many_to_many(&builder);
23 let built = builder.finalize();
24
25 hydro_build_utils::assert_debug_snapshot!(built.ir());
26 }
27
28 #[tokio::test]
29 async fn many_to_many() {
30 let mut deployment = Deployment::new();
31
32 let builder = hydro_lang::FlowBuilder::new();
33 let cluster = super::many_to_many(&builder);
34
35 let nodes = builder
36 .with_default_optimize()
37 .with_cluster(&cluster, (0..2).map(|_| deployment.Localhost()))
38 .deploy(&mut deployment);
39
40 deployment.deploy().await.unwrap();
41
42 let cluster_stdouts = futures::future::join_all(
43 nodes
44 .get_cluster(&cluster)
45 .members()
46 .iter()
47 .map(|node| node.stdout()),
48 )
49 .await;
50
51 deployment.start().await.unwrap();
52
53 for mut node_stdout in cluster_stdouts {
54 let mut node_outs = vec![];
55 for _i in 0..4 {
56 node_outs.push(node_stdout.recv().await.unwrap());
57 }
58 node_outs.sort();
59
60 let mut node_outs = node_outs.into_iter();
61
62 for sender in 0..2 {
63 for value in 0..2 {
64 assert_eq!(
65 node_outs.next().unwrap(),
66 format!("cluster received: (MemberId::<()>({}), {})", sender, value)
67 );
68 }
69 }
70 }
71 }
72}