hydro_test/cluster/
simple_cluster.rs

1use hydro_lang::location::cluster::CLUSTER_SELF_ID;
2use hydro_lang::location::{MemberId, MembershipEvent};
3use hydro_lang::*;
4use hydro_std::compartmentalize::{DecoupleClusterStream, DecoupleProcessStream, PartitionStream};
5use stageleft::IntoQuotedMut;
6
7pub fn partition<'a, F: Fn((MemberId<()>, String)) -> (MemberId<()>, String) + 'a>(
8    cluster1: Cluster<'a, ()>,
9    cluster2: Cluster<'a, ()>,
10    dist_policy: impl IntoQuotedMut<'a, F, Cluster<'a, ()>>,
11) -> (Cluster<'a, ()>, Cluster<'a, ()>) {
12    cluster1
13        .source_iter(q!(vec!(CLUSTER_SELF_ID)))
14        .map(q!(move |id| (
15            MemberId::<()>::from_raw(id.raw_id),
16            format!("Hello from {}", id.raw_id)
17        )))
18        .send_partitioned(&cluster2, dist_policy)
19        .for_each(q!(move |message| println!(
20            "My self id is {}, my message is {:?}",
21            CLUSTER_SELF_ID.raw_id, message
22        )));
23    (cluster1, cluster2)
24}
25
26pub fn decouple_cluster<'a>(flow: &FlowBuilder<'a>) -> (Cluster<'a, ()>, Cluster<'a, ()>) {
27    let cluster1 = flow.cluster();
28    let cluster2 = flow.cluster();
29    cluster1
30        .source_iter(q!(vec!(CLUSTER_SELF_ID)))
31        // .for_each(q!(|message| println!("hey, {}", message)))
32        .inspect(q!(|message| println!("Cluster1 node sending message: {}", message)))
33        .decouple_cluster(&cluster2)
34        .for_each(q!(move |message| println!(
35            "My self id is {}, my message is {}",
36            CLUSTER_SELF_ID, message
37        )));
38    (cluster1, cluster2)
39}
40
41pub fn decouple_process<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, ()>, Process<'a, ()>) {
42    let process1 = flow.process();
43    let process2 = flow.process();
44    process1
45        .source_iter(q!(0..3))
46        .decouple_process(&process2)
47        .for_each(q!(|message| println!("I received message is {}", message)));
48    (process1, process2)
49}
50
51pub fn simple_cluster<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, ()>, Cluster<'a, ()>) {
52    let process = flow.process();
53    let cluster = flow.cluster();
54
55    let numbers = process.source_iter(q!(0..5));
56    let ids = process
57        .source_cluster_members(&cluster)
58        .entries()
59        .filter_map(q!(|(i, e)| match e {
60            MembershipEvent::Joined => Some(i),
61            MembershipEvent::Left => None,
62        }));
63
64    ids.cross_product(numbers)
65        .map(q!(|(id, n)| (id, (id, n))))
66        .demux_bincode(&cluster)
67        .inspect(q!(move |n| println!(
68            "cluster received: {:?} (self cluster id: {})",
69            n, CLUSTER_SELF_ID
70        )))
71        .send_bincode(&process)
72        .entries()
73        .for_each(q!(|(id, d)| println!("node received: ({}, {:?})", id, d)));
74
75    (process, cluster)
76}
77
78#[cfg(test)]
79mod tests {
80    use std::collections::HashMap;
81
82    use hydro_deploy::Deployment;
83    use hydro_lang::Location;
84    use hydro_lang::deploy::{DeployCrateWrapper, HydroDeploy};
85    use hydro_lang::location::MemberId;
86    use hydro_lang::rewrites::persist_pullup;
87    use hydro_optimize::partitioner::{self, Partitioner};
88    use stageleft::q;
89
90    #[test]
91    fn simple_cluster_ir() {
92        let builder = hydro_lang::FlowBuilder::new();
93        let _ = super::simple_cluster(&builder);
94        let built = builder.finalize();
95
96        hydro_build_utils::assert_debug_snapshot!(built.ir());
97    }
98
99    #[tokio::test]
100    async fn simple_cluster() {
101        let mut deployment = Deployment::new();
102
103        let builder = hydro_lang::FlowBuilder::new();
104        let (node, cluster) = super::simple_cluster(&builder);
105
106        let nodes = builder
107            .with_default_optimize()
108            .with_process(&node, deployment.Localhost())
109            .with_cluster(&cluster, (0..2).map(|_| deployment.Localhost()))
110            .deploy(&mut deployment);
111
112        deployment.deploy().await.unwrap();
113
114        let mut node_stdout = nodes.get_process(&node).stdout().await;
115        let cluster_stdouts = futures::future::join_all(
116            nodes
117                .get_cluster(&cluster)
118                .members()
119                .iter()
120                .map(|node| node.stdout()),
121        )
122        .await;
123
124        deployment.start().await.unwrap();
125
126        for (i, mut stdout) in cluster_stdouts.into_iter().enumerate() {
127            for j in 0..5 {
128                assert_eq!(
129                    stdout.recv().await.unwrap(),
130                    format!(
131                        "cluster received: (MemberId::<()>({}), {}) (self cluster id: MemberId::<()>({}))",
132                        i, j, i
133                    )
134                );
135            }
136        }
137
138        let mut node_outs = vec![];
139        for _i in 0..10 {
140            node_outs.push(node_stdout.recv().await.unwrap());
141        }
142        node_outs.sort();
143
144        for (i, n) in node_outs.into_iter().enumerate() {
145            assert_eq!(
146                n,
147                format!(
148                    "node received: (MemberId::<()>({}), (MemberId::<()>({}), {}))",
149                    i / 5,
150                    i / 5,
151                    i % 5
152                )
153            );
154        }
155    }
156
157    #[tokio::test]
158    async fn decouple_process() {
159        let mut deployment = Deployment::new();
160
161        let builder = hydro_lang::FlowBuilder::new();
162        let (process1, process2) = super::decouple_process(&builder);
163        let built = builder.with_default_optimize();
164
165        let nodes = built
166            .with_process(&process1, deployment.Localhost())
167            .with_process(&process2, deployment.Localhost())
168            .deploy(&mut deployment);
169
170        deployment.deploy().await.unwrap();
171        let mut process2_stdout = nodes.get_process(&process2).stdout().await;
172        deployment.start().await.unwrap();
173        for i in 0..3 {
174            let expected_message = format!("I received message is {}", i);
175            assert_eq!(process2_stdout.recv().await.unwrap(), expected_message);
176        }
177    }
178
179    #[tokio::test]
180    async fn decouple_cluster() {
181        let mut deployment = Deployment::new();
182
183        let builder = hydro_lang::FlowBuilder::new();
184        let (cluster1, cluster2) = super::decouple_cluster(&builder);
185        let built = builder.with_default_optimize();
186
187        let nodes = built
188            .with_cluster(&cluster1, (0..3).map(|_| deployment.Localhost()))
189            .with_cluster(&cluster2, (0..3).map(|_| deployment.Localhost()))
190            .deploy(&mut deployment);
191
192        deployment.deploy().await.unwrap();
193
194        let cluster2_stdouts = futures::future::join_all(
195            nodes
196                .get_cluster(&cluster2)
197                .members()
198                .iter()
199                .map(|node| node.stdout()),
200        )
201        .await;
202
203        deployment.start().await.unwrap();
204
205        for (i, mut stdout) in cluster2_stdouts.into_iter().enumerate() {
206            for _j in 0..1 {
207                let expected_message = format!(
208                    "My self id is MemberId::<()>({}), my message is MemberId::<()>({})",
209                    i, i
210                );
211                assert_eq!(stdout.recv().await.unwrap(), expected_message);
212            }
213        }
214    }
215
216    #[tokio::test]
217    async fn partition() {
218        let mut deployment = Deployment::new();
219
220        let num_nodes = 3;
221        let num_partitions = 2;
222        let builder = hydro_lang::FlowBuilder::new();
223        let (cluster1, cluster2) = super::partition(
224            builder.cluster::<()>(),
225            builder.cluster::<()>(),
226            q!(move |(id, msg)| (
227                MemberId::<()>::from_raw(id.raw_id * num_partitions as u32),
228                msg
229            )),
230        );
231        let built = builder.with_default_optimize();
232
233        let nodes = built
234            .with_cluster(&cluster1, (0..num_nodes).map(|_| deployment.Localhost()))
235            .with_cluster(
236                &cluster2,
237                (0..num_nodes * num_partitions).map(|_| deployment.Localhost()),
238            )
239            .deploy(&mut deployment);
240
241        deployment.deploy().await.unwrap();
242
243        let cluster2_stdouts = futures::future::join_all(
244            nodes
245                .get_cluster(&cluster2)
246                .members()
247                .iter()
248                .map(|node| node.stdout()),
249        )
250        .await;
251
252        deployment.start().await.unwrap();
253
254        for (cluster2_id, mut stdout) in cluster2_stdouts.into_iter().enumerate() {
255            if cluster2_id % num_partitions == 0 {
256                let expected_message = format!(
257                    r#"My self id is {}, my message is "Hello from {}""#,
258                    cluster2_id,
259                    cluster2_id / num_partitions
260                );
261                assert_eq!(stdout.recv().await.unwrap(), expected_message);
262            }
263        }
264    }
265
266    #[test]
267    fn partitioned_simple_cluster_ir() {
268        let builder = hydro_lang::FlowBuilder::new();
269        let (_, cluster) = super::simple_cluster(&builder);
270        let partitioner = Partitioner {
271            nodes_to_partition: HashMap::from([(5, vec!["1".to_string()])]),
272            num_partitions: 3,
273            location_id: cluster.id().raw_id(),
274            new_cluster_id: None,
275        };
276        let built = builder
277            .optimize_with(persist_pullup::persist_pullup)
278            .optimize_with(|roots| partitioner::partition(roots, &partitioner))
279            .into_deploy::<HydroDeploy>();
280
281        hydro_build_utils::assert_debug_snapshot!(built.ir());
282
283        for (id, ir) in built.preview_compile().all_dfir() {
284            hydro_build_utils::insta::with_settings!({
285                snapshot_suffix => format!("surface_graph_{id}")
286            }, {
287                hydro_build_utils::assert_snapshot!(ir.surface_syntax_string());
288            });
289        }
290    }
291}