Skip to main content

hydro_test/cluster/
many_to_many.rs

1use hydro_lang::live_collections::stream::TotalOrder;
2use hydro_lang::prelude::*;
3
4pub fn many_to_many<'a>(flow: &mut FlowBuilder<'a>) -> Cluster<'a, ()> {
5    let cluster = flow.cluster();
6    cluster
7        .source_iter(q!(0..2))
8        .broadcast(
9            &cluster,
10            TCP.fail_stop().bincode().name("m2m_broadcast"),
11            nondet!(/** test */),
12        )
13        .entries()
14        .assume_ordering::<TotalOrder>(nondet!(/** intentionally unordered logs */))
15        .for_each(q!(|n| println!("cluster received: {:?}", n)));
16
17    cluster
18}
19
20#[cfg(test)]
21mod tests {
22    use hydro_deploy::Deployment;
23    use hydro_lang::deploy::DeployCrateWrapper;
24
25    #[test]
26    fn many_to_many_ir() {
27        let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
28        let _ = super::many_to_many(&mut builder);
29        let built = builder.finalize();
30
31        hydro_build_utils::assert_debug_snapshot!(built.ir());
32    }
33
34    #[tokio::test]
35    async fn many_to_many() {
36        let mut deployment = Deployment::new();
37
38        let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
39        let cluster = super::many_to_many(&mut builder);
40
41        let nodes = builder
42            .with_default_optimize()
43            .with_cluster(&cluster, (0..2).map(|_| deployment.Localhost()))
44            .deploy(&mut deployment);
45
46        deployment.deploy().await.unwrap();
47
48        let cluster_stdouts = nodes
49            .get_cluster(&cluster)
50            .members()
51            .iter()
52            .map(|node| node.stdout())
53            .collect::<Vec<_>>();
54
55        deployment.start().await.unwrap();
56
57        for mut node_stdout in cluster_stdouts {
58            let mut node_outs = vec![];
59            for _i in 0..4 {
60                node_outs.push(node_stdout.recv().await.unwrap());
61            }
62            node_outs.sort();
63
64            let mut node_outs = node_outs.into_iter();
65
66            for sender in 0..2 {
67                for value in 0..2 {
68                    assert_eq!(
69                        node_outs.next().unwrap(),
70                        format!("cluster received: (MemberId::<()>({}), {})", sender, value)
71                    );
72                }
73            }
74        }
75    }
76}