hydro_test/cluster/
many_to_many.rs

1use hydro_lang::*;
2
3pub fn many_to_many<'a>(flow: &FlowBuilder<'a>) -> Cluster<'a, ()> {
4    let cluster = flow.cluster();
5    cluster
6        .source_iter(q!(0..2))
7        .broadcast_bincode(&cluster, nondet!(/** test */))
8        .entries()
9        .for_each(q!(|n| println!("cluster received: {:?}", n)));
10
11    cluster
12}
13
14#[cfg(test)]
15mod tests {
16    use hydro_deploy::Deployment;
17    use hydro_lang::deploy::DeployCrateWrapper;
18
19    #[test]
20    fn many_to_many_ir() {
21        let builder = hydro_lang::FlowBuilder::new();
22        let _ = super::many_to_many(&builder);
23        let built = builder.finalize();
24
25        hydro_build_utils::assert_debug_snapshot!(built.ir());
26    }
27
28    #[tokio::test]
29    async fn many_to_many() {
30        let mut deployment = Deployment::new();
31
32        let builder = hydro_lang::FlowBuilder::new();
33        let cluster = super::many_to_many(&builder);
34
35        let nodes = builder
36            .with_default_optimize()
37            .with_cluster(&cluster, (0..2).map(|_| deployment.Localhost()))
38            .deploy(&mut deployment);
39
40        deployment.deploy().await.unwrap();
41
42        let cluster_stdouts = futures::future::join_all(
43            nodes
44                .get_cluster(&cluster)
45                .members()
46                .iter()
47                .map(|node| node.stdout()),
48        )
49        .await;
50
51        deployment.start().await.unwrap();
52
53        for mut node_stdout in cluster_stdouts {
54            let mut node_outs = vec![];
55            for _i in 0..4 {
56                node_outs.push(node_stdout.recv().await.unwrap());
57            }
58            node_outs.sort();
59
60            let mut node_outs = node_outs.into_iter();
61
62            for sender in 0..2 {
63                for value in 0..2 {
64                    assert_eq!(
65                        node_outs.next().unwrap(),
66                        format!("cluster received: (MemberId::<()>({}), {})", sender, value)
67                    );
68                }
69            }
70        }
71    }
72}