hydro_test/cluster/
simple_cluster.rs

1use hydro_lang::*;
2use hydro_std::compartmentalize::{DecoupleClusterStream, DecoupleProcessStream, PartitionStream};
3use stageleft::IntoQuotedMut;
4
5pub fn partition<'a, F: Fn((ClusterId<()>, String)) -> (ClusterId<()>, String) + 'a>(
6    cluster1: Cluster<'a, ()>,
7    cluster2: Cluster<'a, ()>,
8    dist_policy: impl IntoQuotedMut<'a, F, Cluster<'a, ()>>,
9) -> (Cluster<'a, ()>, Cluster<'a, ()>) {
10    cluster1
11        .source_iter(q!(vec!(CLUSTER_SELF_ID)))
12        .map(q!(move |id| (
13            ClusterId::<()>::from_raw(id.raw_id),
14            format!("Hello from {}", id.raw_id)
15        )))
16        .send_partitioned(&cluster2, dist_policy)
17        .for_each(q!(move |message| println!(
18            "My self id is {}, my message is {:?}",
19            CLUSTER_SELF_ID.raw_id, message
20        )));
21    (cluster1, cluster2)
22}
23
24pub fn decouple_cluster<'a>(flow: &FlowBuilder<'a>) -> (Cluster<'a, ()>, Cluster<'a, ()>) {
25    let cluster1 = flow.cluster();
26    let cluster2 = flow.cluster();
27    cluster1
28        .source_iter(q!(vec!(CLUSTER_SELF_ID)))
29        // .for_each(q!(|message| println!("hey, {}", message)))
30        .inspect(q!(|message| println!("Cluster1 node sending message: {}", message)))
31        .decouple_cluster(&cluster2)
32        .for_each(q!(move |message| println!(
33            "My self id is {}, my message is {}",
34            CLUSTER_SELF_ID, message
35        )));
36    (cluster1, cluster2)
37}
38
39pub fn decouple_process<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, ()>, Process<'a, ()>) {
40    let process1 = flow.process();
41    let process2 = flow.process();
42    process1
43        .source_iter(q!(0..3))
44        .decouple_process(&process2)
45        .for_each(q!(|message| println!("I received message is {}", message)));
46    (process1, process2)
47}
48
49pub fn simple_cluster<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, ()>, Cluster<'a, ()>) {
50    let process = flow.process();
51    let cluster = flow.cluster();
52
53    let numbers = process.source_iter(q!(0..5));
54    let ids = process.source_iter(cluster.members()).map(q!(|&id| id));
55
56    ids.cross_product(numbers)
57        .map(q!(|(id, n)| (id, (id, n))))
58        .send_bincode(&cluster)
59        .inspect(q!(move |n| println!(
60            "cluster received: {:?} (self cluster id: {})",
61            n, CLUSTER_SELF_ID
62        )))
63        .send_bincode(&process)
64        .for_each(q!(|(id, d)| println!("node received: ({}, {:?})", id, d)));
65
66    (process, cluster)
67}
68
69#[cfg(test)]
70mod tests {
71    use std::collections::HashMap;
72
73    use hydro_deploy::Deployment;
74    use hydro_lang::deploy::{DeployCrateWrapper, DeployRuntime};
75    use hydro_lang::rewrites::partitioner::{self, PartitionAttribute, Partitioner};
76    use hydro_lang::rewrites::{insert_counter, persist_pullup};
77    use hydro_lang::{ClusterId, Location};
78    use stageleft::{RuntimeData, q};
79
80    #[tokio::test]
81    async fn simple_cluster() {
82        let mut deployment = Deployment::new();
83
84        let builder = hydro_lang::FlowBuilder::new();
85        let (node, cluster) = super::simple_cluster(&builder);
86        let built = builder.with_default_optimize();
87
88        insta::assert_debug_snapshot!(built.ir());
89
90        let nodes = built
91            .with_process(&node, deployment.Localhost())
92            .with_cluster(&cluster, (0..2).map(|_| deployment.Localhost()))
93            .deploy(&mut deployment);
94
95        deployment.deploy().await.unwrap();
96
97        let mut node_stdout = nodes.get_process(&node).stdout().await;
98        let cluster_stdouts = futures::future::join_all(
99            nodes
100                .get_cluster(&cluster)
101                .members()
102                .iter()
103                .map(|node| node.stdout()),
104        )
105        .await;
106
107        deployment.start().await.unwrap();
108
109        for (i, mut stdout) in cluster_stdouts.into_iter().enumerate() {
110            for j in 0..5 {
111                assert_eq!(
112                    stdout.recv().await.unwrap(),
113                    format!(
114                        "cluster received: (ClusterId::<()>({}), {}) (self cluster id: ClusterId::<()>({}))",
115                        i, j, i
116                    )
117                );
118            }
119        }
120
121        let mut node_outs = vec![];
122        for _i in 0..10 {
123            node_outs.push(node_stdout.recv().await.unwrap());
124        }
125        node_outs.sort();
126
127        for (i, n) in node_outs.into_iter().enumerate() {
128            assert_eq!(
129                n,
130                format!(
131                    "node received: (ClusterId::<()>({}), (ClusterId::<()>({}), {}))",
132                    i / 5,
133                    i / 5,
134                    i % 5
135                )
136            );
137        }
138    }
139
140    #[tokio::test]
141    async fn decouple_process() {
142        let mut deployment = Deployment::new();
143
144        let builder = hydro_lang::FlowBuilder::new();
145        let (process1, process2) = super::decouple_process(&builder);
146        let built = builder.with_default_optimize();
147
148        let nodes = built
149            .with_process(&process1, deployment.Localhost())
150            .with_process(&process2, deployment.Localhost())
151            .deploy(&mut deployment);
152
153        deployment.deploy().await.unwrap();
154        let mut process2_stdout = nodes.get_process(&process2).stdout().await;
155        deployment.start().await.unwrap();
156        for i in 0..3 {
157            let expected_message = format!("I received message is {}", i);
158            assert_eq!(process2_stdout.recv().await.unwrap(), expected_message);
159        }
160    }
161
162    #[tokio::test]
163    async fn decouple_cluster() {
164        let mut deployment = Deployment::new();
165
166        let builder = hydro_lang::FlowBuilder::new();
167        let (cluster1, cluster2) = super::decouple_cluster(&builder);
168        let built = builder.with_default_optimize();
169
170        let nodes = built
171            .with_cluster(&cluster1, (0..3).map(|_| deployment.Localhost()))
172            .with_cluster(&cluster2, (0..3).map(|_| deployment.Localhost()))
173            .deploy(&mut deployment);
174
175        deployment.deploy().await.unwrap();
176
177        let cluster2_stdouts = futures::future::join_all(
178            nodes
179                .get_cluster(&cluster2)
180                .members()
181                .iter()
182                .map(|node| node.stdout()),
183        )
184        .await;
185
186        deployment.start().await.unwrap();
187
188        for (i, mut stdout) in cluster2_stdouts.into_iter().enumerate() {
189            for _j in 0..1 {
190                let expected_message = format!(
191                    "My self id is ClusterId::<()>({}), my message is ClusterId::<()>({})",
192                    i, i
193                );
194                assert_eq!(stdout.recv().await.unwrap(), expected_message);
195            }
196        }
197    }
198
199    #[tokio::test]
200    async fn partition() {
201        let mut deployment = Deployment::new();
202
203        let num_nodes = 3;
204        let num_partitions = 2;
205        let builder = hydro_lang::FlowBuilder::new();
206        let (cluster1, cluster2) = super::partition(
207            builder.cluster::<()>(),
208            builder.cluster::<()>(),
209            q!(move |(id, msg)| (
210                ClusterId::<()>::from_raw(id.raw_id * num_partitions as u32),
211                msg
212            )),
213        );
214        let built = builder.with_default_optimize();
215
216        let nodes = built
217            .with_cluster(&cluster1, (0..num_nodes).map(|_| deployment.Localhost()))
218            .with_cluster(
219                &cluster2,
220                (0..num_nodes * num_partitions).map(|_| deployment.Localhost()),
221            )
222            .deploy(&mut deployment);
223
224        deployment.deploy().await.unwrap();
225
226        let cluster2_stdouts = futures::future::join_all(
227            nodes
228                .get_cluster(&cluster2)
229                .members()
230                .iter()
231                .map(|node| node.stdout()),
232        )
233        .await;
234
235        deployment.start().await.unwrap();
236
237        for (cluster2_id, mut stdout) in cluster2_stdouts.into_iter().enumerate() {
238            if cluster2_id % num_partitions == 0 {
239                let expected_message = format!(
240                    r#"My self id is {}, my message is "Hello from {}""#,
241                    cluster2_id,
242                    cluster2_id / num_partitions
243                );
244                assert_eq!(stdout.recv().await.unwrap(), expected_message);
245            }
246        }
247    }
248
249    #[test]
250    fn partitioned_simple_cluster_ir() {
251        let builder = hydro_lang::FlowBuilder::new();
252        let (_, cluster) = super::simple_cluster(&builder);
253        let partitioner = Partitioner {
254            nodes_to_partition: HashMap::from([(5, PartitionAttribute::TupleIndex(1))]),
255            num_partitions: 3,
256            partitioned_cluster_id: cluster.id().raw_id(),
257        };
258        let built = builder
259            .optimize_with(persist_pullup::persist_pullup)
260            .optimize_with(|leaves| partitioner::partition(leaves, &partitioner))
261            .into_deploy::<DeployRuntime>();
262
263        insta::assert_debug_snapshot!(built.ir());
264
265        for (id, ir) in built.compile(&RuntimeData::new("FAKE")).all_dfir() {
266            insta::with_settings!({snapshot_suffix => format!("surface_graph_{id}")}, {
267                insta::assert_snapshot!(ir.surface_syntax_string());
268            });
269        }
270    }
271
272    #[test]
273    fn counter_simple_cluster_ir() {
274        let builder = hydro_lang::FlowBuilder::new();
275        let _ = super::simple_cluster(&builder);
276        let counter_output_duration = q!(std::time::Duration::from_secs(1));
277        let built = builder
278            .optimize_with(persist_pullup::persist_pullup)
279            .optimize_with(|leaves| insert_counter::insert_counter(leaves, counter_output_duration))
280            .into_deploy::<DeployRuntime>();
281
282        insta::assert_debug_snapshot!(built.ir());
283
284        for (id, ir) in built.compile(&RuntimeData::new("FAKE")).all_dfir() {
285            insta::with_settings!({snapshot_suffix => format!("surface_graph_{id}")}, {
286                insta::assert_snapshot!(ir.surface_syntax_string());
287            });
288        }
289    }
290}