1use hydro_lang::*;
2use hydro_std::compartmentalize::{DecoupleClusterStream, DecoupleProcessStream, PartitionStream};
3use stageleft::IntoQuotedMut;
4
5pub fn partition<'a, F: Fn((ClusterId<()>, String)) -> (ClusterId<()>, String) + 'a>(
6 cluster1: Cluster<'a, ()>,
7 cluster2: Cluster<'a, ()>,
8 dist_policy: impl IntoQuotedMut<'a, F, Cluster<'a, ()>>,
9) -> (Cluster<'a, ()>, Cluster<'a, ()>) {
10 cluster1
11 .source_iter(q!(vec!(CLUSTER_SELF_ID)))
12 .map(q!(move |id| (
13 ClusterId::<()>::from_raw(id.raw_id),
14 format!("Hello from {}", id.raw_id)
15 )))
16 .send_partitioned(&cluster2, dist_policy)
17 .for_each(q!(move |message| println!(
18 "My self id is {}, my message is {:?}",
19 CLUSTER_SELF_ID.raw_id, message
20 )));
21 (cluster1, cluster2)
22}
23
24pub fn decouple_cluster<'a>(flow: &FlowBuilder<'a>) -> (Cluster<'a, ()>, Cluster<'a, ()>) {
25 let cluster1 = flow.cluster();
26 let cluster2 = flow.cluster();
27 cluster1
28 .source_iter(q!(vec!(CLUSTER_SELF_ID)))
29 .inspect(q!(|message| println!("Cluster1 node sending message: {}", message)))
31 .decouple_cluster(&cluster2)
32 .for_each(q!(move |message| println!(
33 "My self id is {}, my message is {}",
34 CLUSTER_SELF_ID, message
35 )));
36 (cluster1, cluster2)
37}
38
39pub fn decouple_process<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, ()>, Process<'a, ()>) {
40 let process1 = flow.process();
41 let process2 = flow.process();
42 process1
43 .source_iter(q!(0..3))
44 .decouple_process(&process2)
45 .for_each(q!(|message| println!("I received message is {}", message)));
46 (process1, process2)
47}
48
49pub fn simple_cluster<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, ()>, Cluster<'a, ()>) {
50 let process = flow.process();
51 let cluster = flow.cluster();
52
53 let numbers = process.source_iter(q!(0..5));
54 let ids = process.source_iter(cluster.members()).map(q!(|&id| id));
55
56 ids.cross_product(numbers)
57 .map(q!(|(id, n)| (id, (id, n))))
58 .send_bincode(&cluster)
59 .inspect(q!(move |n| println!(
60 "cluster received: {:?} (self cluster id: {})",
61 n, CLUSTER_SELF_ID
62 )))
63 .send_bincode(&process)
64 .for_each(q!(|(id, d)| println!("node received: ({}, {:?})", id, d)));
65
66 (process, cluster)
67}
68
69#[cfg(test)]
70mod tests {
71 use std::collections::HashMap;
72
73 use hydro_deploy::Deployment;
74 use hydro_lang::deploy::{DeployCrateWrapper, DeployRuntime};
75 use hydro_lang::rewrites::partitioner::{self, PartitionAttribute, Partitioner};
76 use hydro_lang::rewrites::{insert_counter, persist_pullup};
77 use hydro_lang::{ClusterId, Location};
78 use stageleft::{RuntimeData, q};
79
80 #[tokio::test]
81 async fn simple_cluster() {
82 let mut deployment = Deployment::new();
83
84 let builder = hydro_lang::FlowBuilder::new();
85 let (node, cluster) = super::simple_cluster(&builder);
86 let built = builder.with_default_optimize();
87
88 insta::assert_debug_snapshot!(built.ir());
89
90 let nodes = built
91 .with_process(&node, deployment.Localhost())
92 .with_cluster(&cluster, (0..2).map(|_| deployment.Localhost()))
93 .deploy(&mut deployment);
94
95 deployment.deploy().await.unwrap();
96
97 let mut node_stdout = nodes.get_process(&node).stdout().await;
98 let cluster_stdouts = futures::future::join_all(
99 nodes
100 .get_cluster(&cluster)
101 .members()
102 .iter()
103 .map(|node| node.stdout()),
104 )
105 .await;
106
107 deployment.start().await.unwrap();
108
109 for (i, mut stdout) in cluster_stdouts.into_iter().enumerate() {
110 for j in 0..5 {
111 assert_eq!(
112 stdout.recv().await.unwrap(),
113 format!(
114 "cluster received: (ClusterId::<()>({}), {}) (self cluster id: ClusterId::<()>({}))",
115 i, j, i
116 )
117 );
118 }
119 }
120
121 let mut node_outs = vec![];
122 for _i in 0..10 {
123 node_outs.push(node_stdout.recv().await.unwrap());
124 }
125 node_outs.sort();
126
127 for (i, n) in node_outs.into_iter().enumerate() {
128 assert_eq!(
129 n,
130 format!(
131 "node received: (ClusterId::<()>({}), (ClusterId::<()>({}), {}))",
132 i / 5,
133 i / 5,
134 i % 5
135 )
136 );
137 }
138 }
139
140 #[tokio::test]
141 async fn decouple_process() {
142 let mut deployment = Deployment::new();
143
144 let builder = hydro_lang::FlowBuilder::new();
145 let (process1, process2) = super::decouple_process(&builder);
146 let built = builder.with_default_optimize();
147
148 let nodes = built
149 .with_process(&process1, deployment.Localhost())
150 .with_process(&process2, deployment.Localhost())
151 .deploy(&mut deployment);
152
153 deployment.deploy().await.unwrap();
154 let mut process2_stdout = nodes.get_process(&process2).stdout().await;
155 deployment.start().await.unwrap();
156 for i in 0..3 {
157 let expected_message = format!("I received message is {}", i);
158 assert_eq!(process2_stdout.recv().await.unwrap(), expected_message);
159 }
160 }
161
162 #[tokio::test]
163 async fn decouple_cluster() {
164 let mut deployment = Deployment::new();
165
166 let builder = hydro_lang::FlowBuilder::new();
167 let (cluster1, cluster2) = super::decouple_cluster(&builder);
168 let built = builder.with_default_optimize();
169
170 let nodes = built
171 .with_cluster(&cluster1, (0..3).map(|_| deployment.Localhost()))
172 .with_cluster(&cluster2, (0..3).map(|_| deployment.Localhost()))
173 .deploy(&mut deployment);
174
175 deployment.deploy().await.unwrap();
176
177 let cluster2_stdouts = futures::future::join_all(
178 nodes
179 .get_cluster(&cluster2)
180 .members()
181 .iter()
182 .map(|node| node.stdout()),
183 )
184 .await;
185
186 deployment.start().await.unwrap();
187
188 for (i, mut stdout) in cluster2_stdouts.into_iter().enumerate() {
189 for _j in 0..1 {
190 let expected_message = format!(
191 "My self id is ClusterId::<()>({}), my message is ClusterId::<()>({})",
192 i, i
193 );
194 assert_eq!(stdout.recv().await.unwrap(), expected_message);
195 }
196 }
197 }
198
199 #[tokio::test]
200 async fn partition() {
201 let mut deployment = Deployment::new();
202
203 let num_nodes = 3;
204 let num_partitions = 2;
205 let builder = hydro_lang::FlowBuilder::new();
206 let (cluster1, cluster2) = super::partition(
207 builder.cluster::<()>(),
208 builder.cluster::<()>(),
209 q!(move |(id, msg)| (
210 ClusterId::<()>::from_raw(id.raw_id * num_partitions as u32),
211 msg
212 )),
213 );
214 let built = builder.with_default_optimize();
215
216 let nodes = built
217 .with_cluster(&cluster1, (0..num_nodes).map(|_| deployment.Localhost()))
218 .with_cluster(
219 &cluster2,
220 (0..num_nodes * num_partitions).map(|_| deployment.Localhost()),
221 )
222 .deploy(&mut deployment);
223
224 deployment.deploy().await.unwrap();
225
226 let cluster2_stdouts = futures::future::join_all(
227 nodes
228 .get_cluster(&cluster2)
229 .members()
230 .iter()
231 .map(|node| node.stdout()),
232 )
233 .await;
234
235 deployment.start().await.unwrap();
236
237 for (cluster2_id, mut stdout) in cluster2_stdouts.into_iter().enumerate() {
238 if cluster2_id % num_partitions == 0 {
239 let expected_message = format!(
240 r#"My self id is {}, my message is "Hello from {}""#,
241 cluster2_id,
242 cluster2_id / num_partitions
243 );
244 assert_eq!(stdout.recv().await.unwrap(), expected_message);
245 }
246 }
247 }
248
249 #[test]
250 fn partitioned_simple_cluster_ir() {
251 let builder = hydro_lang::FlowBuilder::new();
252 let (_, cluster) = super::simple_cluster(&builder);
253 let partitioner = Partitioner {
254 nodes_to_partition: HashMap::from([(5, PartitionAttribute::TupleIndex(1))]),
255 num_partitions: 3,
256 partitioned_cluster_id: cluster.id().raw_id(),
257 };
258 let built = builder
259 .optimize_with(persist_pullup::persist_pullup)
260 .optimize_with(|leaves| partitioner::partition(leaves, &partitioner))
261 .into_deploy::<DeployRuntime>();
262
263 insta::assert_debug_snapshot!(built.ir());
264
265 for (id, ir) in built.compile(&RuntimeData::new("FAKE")).all_dfir() {
266 insta::with_settings!({snapshot_suffix => format!("surface_graph_{id}")}, {
267 insta::assert_snapshot!(ir.surface_syntax_string());
268 });
269 }
270 }
271
272 #[test]
273 fn counter_simple_cluster_ir() {
274 let builder = hydro_lang::FlowBuilder::new();
275 let _ = super::simple_cluster(&builder);
276 let counter_output_duration = q!(std::time::Duration::from_secs(1));
277 let built = builder
278 .optimize_with(persist_pullup::persist_pullup)
279 .optimize_with(|leaves| insert_counter::insert_counter(leaves, counter_output_duration))
280 .into_deploy::<DeployRuntime>();
281
282 insta::assert_debug_snapshot!(built.ir());
283
284 for (id, ir) in built.compile(&RuntimeData::new("FAKE")).all_dfir() {
285 insta::with_settings!({snapshot_suffix => format!("surface_graph_{id}")}, {
286 insta::assert_snapshot!(ir.surface_syntax_string());
287 });
288 }
289 }
290}