hydro_test/cluster/
many_to_many.rs

1use hydro_lang::*;
2
3pub fn many_to_many<'a>(flow: &FlowBuilder<'a>) -> Cluster<'a, ()> {
4    let cluster = flow.cluster();
5    cluster
6        .source_iter(q!(0..2))
7        .broadcast_bincode(&cluster)
8        .for_each(q!(|n| println!("cluster received: {:?}", n)));
9
10    cluster
11}
12
13#[cfg(test)]
14mod tests {
15    use hydro_deploy::Deployment;
16    use hydro_lang::deploy::DeployCrateWrapper;
17
18    #[tokio::test]
19    async fn many_to_many() {
20        let mut deployment = Deployment::new();
21
22        let builder = hydro_lang::FlowBuilder::new();
23        let cluster = super::many_to_many(&builder);
24        let built = builder.with_default_optimize();
25
26        insta::assert_debug_snapshot!(built.ir());
27
28        let nodes = built
29            .with_cluster(&cluster, (0..2).map(|_| deployment.Localhost()))
30            .deploy(&mut deployment);
31
32        deployment.deploy().await.unwrap();
33
34        let cluster_stdouts = futures::future::join_all(
35            nodes
36                .get_cluster(&cluster)
37                .members()
38                .iter()
39                .map(|node| node.stdout()),
40        )
41        .await;
42
43        deployment.start().await.unwrap();
44
45        for mut node_stdout in cluster_stdouts {
46            let mut node_outs = vec![];
47            for _i in 0..4 {
48                node_outs.push(node_stdout.recv().await.unwrap());
49            }
50            node_outs.sort();
51
52            let mut node_outs = node_outs.into_iter();
53
54            for sender in 0..2 {
55                for value in 0..2 {
56                    assert_eq!(
57                        node_outs.next().unwrap(),
58                        format!("cluster received: (ClusterId::<()>({}), {})", sender, value)
59                    );
60                }
61            }
62        }
63    }
64}