Skip to main content

hydro_test/cluster/
simple_cluster.rs

1use hydro_lang::live_collections::stream::TotalOrder;
2use hydro_lang::location::cluster::CLUSTER_SELF_ID;
3use hydro_lang::location::{MemberId, MembershipEvent};
4use hydro_lang::prelude::*;
5use hydro_std::compartmentalize::{DecoupleClusterStream, DecoupleProcessStream, PartitionStream};
6use stageleft::IntoQuotedMut;
7
8pub fn partition<'a, F: Fn((MemberId<()>, String)) -> (MemberId<()>, String) + 'a>(
9    cluster1: Cluster<'a, ()>,
10    cluster2: Cluster<'a, ()>,
11    dist_policy: impl IntoQuotedMut<'a, F, Cluster<'a, ()>>,
12) -> (Cluster<'a, ()>, Cluster<'a, ()>) {
13    cluster1
14        .source_stream(q!(tokio_stream::iter(vec!(CLUSTER_SELF_ID))))
15        .map(q!(move |id| (id.clone(), format!("Hello from {}", id))))
16        .send_partitioned(&cluster2, dist_policy)
17        .assume_ordering::<TotalOrder>(nondet!(/** testing, order does not matter */))
18        .for_each(q!(move |message| println!(
19            "My self id is {}, my message is {:?}",
20            CLUSTER_SELF_ID, message
21        )));
22    (cluster1, cluster2)
23}
24
25pub fn decouple_cluster<'a>(flow: &mut FlowBuilder<'a>) -> (Cluster<'a, ()>, Cluster<'a, ()>) {
26    let cluster1 = flow.cluster();
27    let cluster2 = flow.cluster();
28    cluster1
29        .source_iter(q!(vec!(CLUSTER_SELF_ID)))
30        // .for_each(q!(|message| println!("hey, {}", message)))
31        .inspect(q!(|message| println!("Cluster1 node sending message: {}", message)))
32        .decouple_cluster(&cluster2)
33        .for_each(q!(move |message| println!(
34            "My self id is {}, my message is {}",
35            CLUSTER_SELF_ID, message
36        )));
37    (cluster1, cluster2)
38}
39
40pub fn decouple_process<'a>(flow: &mut FlowBuilder<'a>) -> (Process<'a, ()>, Process<'a, ()>) {
41    let process1 = flow.process();
42    let process2 = flow.process();
43    process1
44        .source_iter(q!(0..3))
45        .decouple_process(&process2)
46        .for_each(q!(|message| println!("I received message is {}", message)));
47    (process1, process2)
48}
49
50pub fn simple_cluster<'a>(flow: &mut FlowBuilder<'a>) -> (Process<'a, ()>, Cluster<'a, ()>) {
51    let process = flow.process();
52    let cluster = flow.cluster();
53
54    let numbers = process.source_iter(q!(0..5));
55    let ids = process
56        .source_cluster_members(&cluster)
57        .entries()
58        .filter_map(q!(|(i, e)| match e {
59            MembershipEvent::Joined => Some(i),
60            MembershipEvent::Left => None,
61        }));
62
63    ids.cross_product(numbers.into())
64        .map(q!(|(id, n)| (id.clone(), (id, n))))
65        .demux(&cluster, TCP.fail_stop().bincode())
66        .inspect(q!(move |n| println!(
67            "cluster received: {:?} (self cluster id: {})",
68            n, CLUSTER_SELF_ID
69        )))
70        .send(&process, TCP.fail_stop().bincode())
71        .entries()
72        .assume_ordering::<TotalOrder>(nondet!(/** testing, order does not matter */))
73        .for_each(q!(|(id, d)| println!("node received: ({}, {:?})", id, d)));
74
75    (process, cluster)
76}
77
78#[cfg(test)]
79mod tests {
80    use hydro_deploy::Deployment;
81    use hydro_lang::deploy::DeployCrateWrapper;
82    use hydro_lang::location::MemberId;
83    use stageleft::q;
84
85    #[test]
86    fn simple_cluster_ir() {
87        let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
88        let _ = super::simple_cluster(&mut builder);
89        let built = builder.finalize();
90
91        hydro_build_utils::assert_debug_snapshot!(built.ir());
92    }
93
94    #[tokio::test]
95    async fn simple_cluster() {
96        let mut deployment = Deployment::new();
97
98        let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
99        let (node, cluster) = super::simple_cluster(&mut builder);
100
101        let nodes = builder
102            .with_default_optimize()
103            .with_process(&node, deployment.Localhost())
104            .with_cluster(&cluster, (0..2).map(|_| deployment.Localhost()))
105            .deploy(&mut deployment);
106
107        deployment.deploy().await.unwrap();
108
109        let mut node_stdout = nodes.get_process(&node).stdout();
110        let cluster_stdouts = nodes
111            .get_cluster(&cluster)
112            .members()
113            .iter()
114            .map(|node| node.stdout())
115            .collect::<Vec<_>>();
116
117        deployment.start().await.unwrap();
118
119        for (i, mut stdout) in cluster_stdouts.into_iter().enumerate() {
120            for j in 0..5 {
121                assert_eq!(
122                    stdout.recv().await.unwrap(),
123                    format!(
124                        "cluster received: (MemberId::<()>({}), {}) (self cluster id: MemberId::<()>({}))",
125                        i, j, i
126                    )
127                );
128            }
129        }
130
131        let mut node_outs = vec![];
132        for _i in 0..10 {
133            node_outs.push(node_stdout.recv().await.unwrap());
134        }
135        node_outs.sort();
136
137        for (i, n) in node_outs.into_iter().enumerate() {
138            assert_eq!(
139                n,
140                format!(
141                    "node received: (MemberId::<()>({}), (MemberId::<()>({}), {}))",
142                    i / 5,
143                    i / 5,
144                    i % 5
145                )
146            );
147        }
148    }
149
150    #[tokio::test]
151    async fn decouple_process() {
152        let mut deployment = Deployment::new();
153
154        let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
155        let (process1, process2) = super::decouple_process(&mut builder);
156        let built = builder.with_default_optimize();
157
158        let nodes = built
159            .with_process(&process1, deployment.Localhost())
160            .with_process(&process2, deployment.Localhost())
161            .deploy(&mut deployment);
162
163        deployment.deploy().await.unwrap();
164        let mut process2_stdout = nodes.get_process(&process2).stdout();
165        deployment.start().await.unwrap();
166        for i in 0..3 {
167            let expected_message = format!("I received message is {}", i);
168            assert_eq!(process2_stdout.recv().await.unwrap(), expected_message);
169        }
170    }
171
172    #[tokio::test]
173    async fn decouple_cluster() {
174        let mut deployment = Deployment::new();
175
176        let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
177        let (cluster1, cluster2) = super::decouple_cluster(&mut builder);
178        let built = builder.with_default_optimize();
179
180        let nodes = built
181            .with_cluster(&cluster1, (0..3).map(|_| deployment.Localhost()))
182            .with_cluster(&cluster2, (0..3).map(|_| deployment.Localhost()))
183            .deploy(&mut deployment);
184
185        deployment.deploy().await.unwrap();
186
187        let cluster2_stdouts = nodes
188            .get_cluster(&cluster2)
189            .members()
190            .iter()
191            .map(|node| node.stdout())
192            .collect::<Vec<_>>();
193
194        deployment.start().await.unwrap();
195
196        for (i, mut stdout) in cluster2_stdouts.into_iter().enumerate() {
197            for _j in 0..1 {
198                let expected_message = format!(
199                    "My self id is MemberId::<()>({}), my message is MemberId::<()>({})",
200                    i, i
201                );
202                assert_eq!(stdout.recv().await.unwrap(), expected_message);
203            }
204        }
205    }
206
207    #[tokio::test]
208    async fn partition() {
209        let mut deployment = Deployment::new();
210
211        let num_nodes = 3;
212        let num_partitions = 2;
213        let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
214        let (cluster1, cluster2) = super::partition(
215            builder.cluster::<()>(),
216            builder.cluster::<()>(),
217            q!(move |(id, msg)| (
218                MemberId::<()>::from_raw_id(id.get_raw_id() * num_partitions as u32),
219                msg
220            )),
221        );
222        let built = builder.with_default_optimize();
223
224        let nodes = built
225            .with_cluster(&cluster1, (0..num_nodes).map(|_| deployment.Localhost()))
226            .with_cluster(
227                &cluster2,
228                (0..num_nodes * num_partitions).map(|_| deployment.Localhost()),
229            )
230            .deploy(&mut deployment);
231
232        deployment.deploy().await.unwrap();
233
234        let cluster2_stdouts = nodes
235            .get_cluster(&cluster2)
236            .members()
237            .iter()
238            .map(|node| node.stdout())
239            .collect::<Vec<_>>();
240
241        deployment.start().await.unwrap();
242
243        for (cluster2_id, mut stdout) in cluster2_stdouts.into_iter().enumerate() {
244            if cluster2_id % num_partitions == 0 {
245                let expected_message = format!(
246                    r#"My self id is MemberId::<()>({}), my message is "Hello from MemberId::<()>({})""#,
247                    cluster2_id,
248                    cluster2_id / num_partitions
249                );
250                assert_eq!(stdout.recv().await.unwrap(), expected_message);
251            }
252        }
253    }
254}