1use hydro_lang::location::cluster::CLUSTER_SELF_ID;
2use hydro_lang::location::{MemberId, MembershipEvent};
3use hydro_lang::*;
4use hydro_std::compartmentalize::{DecoupleClusterStream, DecoupleProcessStream, PartitionStream};
5use stageleft::IntoQuotedMut;
6
7pub fn partition<'a, F: Fn((MemberId<()>, String)) -> (MemberId<()>, String) + 'a>(
8 cluster1: Cluster<'a, ()>,
9 cluster2: Cluster<'a, ()>,
10 dist_policy: impl IntoQuotedMut<'a, F, Cluster<'a, ()>>,
11) -> (Cluster<'a, ()>, Cluster<'a, ()>) {
12 cluster1
13 .source_iter(q!(vec!(CLUSTER_SELF_ID)))
14 .map(q!(move |id| (
15 MemberId::<()>::from_raw(id.raw_id),
16 format!("Hello from {}", id.raw_id)
17 )))
18 .send_partitioned(&cluster2, dist_policy)
19 .for_each(q!(move |message| println!(
20 "My self id is {}, my message is {:?}",
21 CLUSTER_SELF_ID.raw_id, message
22 )));
23 (cluster1, cluster2)
24}
25
26pub fn decouple_cluster<'a>(flow: &FlowBuilder<'a>) -> (Cluster<'a, ()>, Cluster<'a, ()>) {
27 let cluster1 = flow.cluster();
28 let cluster2 = flow.cluster();
29 cluster1
30 .source_iter(q!(vec!(CLUSTER_SELF_ID)))
31 .inspect(q!(|message| println!("Cluster1 node sending message: {}", message)))
33 .decouple_cluster(&cluster2)
34 .for_each(q!(move |message| println!(
35 "My self id is {}, my message is {}",
36 CLUSTER_SELF_ID, message
37 )));
38 (cluster1, cluster2)
39}
40
41pub fn decouple_process<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, ()>, Process<'a, ()>) {
42 let process1 = flow.process();
43 let process2 = flow.process();
44 process1
45 .source_iter(q!(0..3))
46 .decouple_process(&process2)
47 .for_each(q!(|message| println!("I received message is {}", message)));
48 (process1, process2)
49}
50
51pub fn simple_cluster<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, ()>, Cluster<'a, ()>) {
52 let process = flow.process();
53 let cluster = flow.cluster();
54
55 let numbers = process.source_iter(q!(0..5));
56 let ids = process
57 .source_cluster_members(&cluster)
58 .entries()
59 .filter_map(q!(|(i, e)| match e {
60 MembershipEvent::Joined => Some(i),
61 MembershipEvent::Left => None,
62 }));
63
64 ids.cross_product(numbers)
65 .map(q!(|(id, n)| (id, (id, n))))
66 .demux_bincode(&cluster)
67 .inspect(q!(move |n| println!(
68 "cluster received: {:?} (self cluster id: {})",
69 n, CLUSTER_SELF_ID
70 )))
71 .send_bincode(&process)
72 .entries()
73 .for_each(q!(|(id, d)| println!("node received: ({}, {:?})", id, d)));
74
75 (process, cluster)
76}
77
78#[cfg(test)]
79mod tests {
80 use std::collections::HashMap;
81
82 use hydro_deploy::Deployment;
83 use hydro_lang::Location;
84 use hydro_lang::deploy::{DeployCrateWrapper, HydroDeploy};
85 use hydro_lang::location::MemberId;
86 use hydro_lang::rewrites::persist_pullup;
87 use hydro_optimize::partitioner::{self, Partitioner};
88 use stageleft::q;
89
90 #[test]
91 fn simple_cluster_ir() {
92 let builder = hydro_lang::FlowBuilder::new();
93 let _ = super::simple_cluster(&builder);
94 let built = builder.finalize();
95
96 hydro_build_utils::assert_debug_snapshot!(built.ir());
97 }
98
99 #[tokio::test]
100 async fn simple_cluster() {
101 let mut deployment = Deployment::new();
102
103 let builder = hydro_lang::FlowBuilder::new();
104 let (node, cluster) = super::simple_cluster(&builder);
105
106 let nodes = builder
107 .with_default_optimize()
108 .with_process(&node, deployment.Localhost())
109 .with_cluster(&cluster, (0..2).map(|_| deployment.Localhost()))
110 .deploy(&mut deployment);
111
112 deployment.deploy().await.unwrap();
113
114 let mut node_stdout = nodes.get_process(&node).stdout().await;
115 let cluster_stdouts = futures::future::join_all(
116 nodes
117 .get_cluster(&cluster)
118 .members()
119 .iter()
120 .map(|node| node.stdout()),
121 )
122 .await;
123
124 deployment.start().await.unwrap();
125
126 for (i, mut stdout) in cluster_stdouts.into_iter().enumerate() {
127 for j in 0..5 {
128 assert_eq!(
129 stdout.recv().await.unwrap(),
130 format!(
131 "cluster received: (MemberId::<()>({}), {}) (self cluster id: MemberId::<()>({}))",
132 i, j, i
133 )
134 );
135 }
136 }
137
138 let mut node_outs = vec![];
139 for _i in 0..10 {
140 node_outs.push(node_stdout.recv().await.unwrap());
141 }
142 node_outs.sort();
143
144 for (i, n) in node_outs.into_iter().enumerate() {
145 assert_eq!(
146 n,
147 format!(
148 "node received: (MemberId::<()>({}), (MemberId::<()>({}), {}))",
149 i / 5,
150 i / 5,
151 i % 5
152 )
153 );
154 }
155 }
156
157 #[tokio::test]
158 async fn decouple_process() {
159 let mut deployment = Deployment::new();
160
161 let builder = hydro_lang::FlowBuilder::new();
162 let (process1, process2) = super::decouple_process(&builder);
163 let built = builder.with_default_optimize();
164
165 let nodes = built
166 .with_process(&process1, deployment.Localhost())
167 .with_process(&process2, deployment.Localhost())
168 .deploy(&mut deployment);
169
170 deployment.deploy().await.unwrap();
171 let mut process2_stdout = nodes.get_process(&process2).stdout().await;
172 deployment.start().await.unwrap();
173 for i in 0..3 {
174 let expected_message = format!("I received message is {}", i);
175 assert_eq!(process2_stdout.recv().await.unwrap(), expected_message);
176 }
177 }
178
179 #[tokio::test]
180 async fn decouple_cluster() {
181 let mut deployment = Deployment::new();
182
183 let builder = hydro_lang::FlowBuilder::new();
184 let (cluster1, cluster2) = super::decouple_cluster(&builder);
185 let built = builder.with_default_optimize();
186
187 let nodes = built
188 .with_cluster(&cluster1, (0..3).map(|_| deployment.Localhost()))
189 .with_cluster(&cluster2, (0..3).map(|_| deployment.Localhost()))
190 .deploy(&mut deployment);
191
192 deployment.deploy().await.unwrap();
193
194 let cluster2_stdouts = futures::future::join_all(
195 nodes
196 .get_cluster(&cluster2)
197 .members()
198 .iter()
199 .map(|node| node.stdout()),
200 )
201 .await;
202
203 deployment.start().await.unwrap();
204
205 for (i, mut stdout) in cluster2_stdouts.into_iter().enumerate() {
206 for _j in 0..1 {
207 let expected_message = format!(
208 "My self id is MemberId::<()>({}), my message is MemberId::<()>({})",
209 i, i
210 );
211 assert_eq!(stdout.recv().await.unwrap(), expected_message);
212 }
213 }
214 }
215
216 #[tokio::test]
217 async fn partition() {
218 let mut deployment = Deployment::new();
219
220 let num_nodes = 3;
221 let num_partitions = 2;
222 let builder = hydro_lang::FlowBuilder::new();
223 let (cluster1, cluster2) = super::partition(
224 builder.cluster::<()>(),
225 builder.cluster::<()>(),
226 q!(move |(id, msg)| (
227 MemberId::<()>::from_raw(id.raw_id * num_partitions as u32),
228 msg
229 )),
230 );
231 let built = builder.with_default_optimize();
232
233 let nodes = built
234 .with_cluster(&cluster1, (0..num_nodes).map(|_| deployment.Localhost()))
235 .with_cluster(
236 &cluster2,
237 (0..num_nodes * num_partitions).map(|_| deployment.Localhost()),
238 )
239 .deploy(&mut deployment);
240
241 deployment.deploy().await.unwrap();
242
243 let cluster2_stdouts = futures::future::join_all(
244 nodes
245 .get_cluster(&cluster2)
246 .members()
247 .iter()
248 .map(|node| node.stdout()),
249 )
250 .await;
251
252 deployment.start().await.unwrap();
253
254 for (cluster2_id, mut stdout) in cluster2_stdouts.into_iter().enumerate() {
255 if cluster2_id % num_partitions == 0 {
256 let expected_message = format!(
257 r#"My self id is {}, my message is "Hello from {}""#,
258 cluster2_id,
259 cluster2_id / num_partitions
260 );
261 assert_eq!(stdout.recv().await.unwrap(), expected_message);
262 }
263 }
264 }
265
266 #[test]
267 fn partitioned_simple_cluster_ir() {
268 let builder = hydro_lang::FlowBuilder::new();
269 let (_, cluster) = super::simple_cluster(&builder);
270 let partitioner = Partitioner {
271 nodes_to_partition: HashMap::from([(5, vec!["1".to_string()])]),
272 num_partitions: 3,
273 location_id: cluster.id().raw_id(),
274 new_cluster_id: None,
275 };
276 let built = builder
277 .optimize_with(persist_pullup::persist_pullup)
278 .optimize_with(|roots| partitioner::partition(roots, &partitioner))
279 .into_deploy::<HydroDeploy>();
280
281 hydro_build_utils::assert_debug_snapshot!(built.ir());
282
283 for (id, ir) in built.preview_compile().all_dfir() {
284 hydro_build_utils::insta::with_settings!({
285 snapshot_suffix => format!("surface_graph_{id}")
286 }, {
287 hydro_build_utils::assert_snapshot!(ir.surface_syntax_string());
288 });
289 }
290 }
291}