hydro_test/cluster/
many_to_many.rs1use hydro_lang::live_collections::stream::TotalOrder;
2use hydro_lang::prelude::*;
3
4pub fn many_to_many<'a>(flow: &mut FlowBuilder<'a>) -> Cluster<'a, ()> {
5 let cluster = flow.cluster();
6 cluster
7 .source_iter(q!(0..2))
8 .broadcast(
9 &cluster,
10 TCP.fail_stop().bincode().name("m2m_broadcast"),
11 nondet!(),
12 )
13 .entries()
14 .assume_ordering::<TotalOrder>(nondet!())
15 .for_each(q!(|n| println!("cluster received: {:?}", n)));
16
17 cluster
18}
19
20#[cfg(test)]
21mod tests {
22 use hydro_deploy::Deployment;
23 use hydro_lang::deploy::DeployCrateWrapper;
24
25 #[test]
26 fn many_to_many_ir() {
27 let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
28 let _ = super::many_to_many(&mut builder);
29 let built = builder.finalize();
30
31 hydro_build_utils::assert_debug_snapshot!(built.ir());
32 }
33
34 #[tokio::test]
35 async fn many_to_many() {
36 let mut deployment = Deployment::new();
37
38 let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
39 let cluster = super::many_to_many(&mut builder);
40
41 let nodes = builder
42 .with_default_optimize()
43 .with_cluster(&cluster, (0..2).map(|_| deployment.Localhost()))
44 .deploy(&mut deployment);
45
46 deployment.deploy().await.unwrap();
47
48 let cluster_stdouts = nodes
49 .get_cluster(&cluster)
50 .members()
51 .iter()
52 .map(|node| node.stdout())
53 .collect::<Vec<_>>();
54
55 deployment.start().await.unwrap();
56
57 for mut node_stdout in cluster_stdouts {
58 let mut node_outs = vec![];
59 for _i in 0..4 {
60 node_outs.push(node_stdout.recv().await.unwrap());
61 }
62 node_outs.sort();
63
64 let mut node_outs = node_outs.into_iter();
65
66 for sender in 0..2 {
67 for value in 0..2 {
68 assert_eq!(
69 node_outs.next().unwrap(),
70 format!("cluster received: (MemberId::<()>({}), {})", sender, value)
71 );
72 }
73 }
74 }
75 }
76}