1use hydro_lang::live_collections::stream::TotalOrder;
2use hydro_lang::location::cluster::CLUSTER_SELF_ID;
3use hydro_lang::location::{MemberId, MembershipEvent};
4use hydro_lang::prelude::*;
5use hydro_std::compartmentalize::{DecoupleClusterStream, DecoupleProcessStream, PartitionStream};
6use stageleft::IntoQuotedMut;
7
8pub fn partition<'a, F: Fn((MemberId<()>, String)) -> (MemberId<()>, String) + 'a>(
9 cluster1: Cluster<'a, ()>,
10 cluster2: Cluster<'a, ()>,
11 dist_policy: impl IntoQuotedMut<'a, F, Cluster<'a, ()>>,
12) -> (Cluster<'a, ()>, Cluster<'a, ()>) {
13 cluster1
14 .source_stream(q!(tokio_stream::iter(vec!(CLUSTER_SELF_ID))))
15 .map(q!(move |id| (id.clone(), format!("Hello from {}", id))))
16 .send_partitioned(&cluster2, dist_policy)
17 .assume_ordering::<TotalOrder>(nondet!())
18 .for_each(q!(move |message| println!(
19 "My self id is {}, my message is {:?}",
20 CLUSTER_SELF_ID, message
21 )));
22 (cluster1, cluster2)
23}
24
25pub fn decouple_cluster<'a>(flow: &mut FlowBuilder<'a>) -> (Cluster<'a, ()>, Cluster<'a, ()>) {
26 let cluster1 = flow.cluster();
27 let cluster2 = flow.cluster();
28 cluster1
29 .source_iter(q!(vec!(CLUSTER_SELF_ID)))
30 .inspect(q!(|message| println!("Cluster1 node sending message: {}", message)))
32 .decouple_cluster(&cluster2)
33 .for_each(q!(move |message| println!(
34 "My self id is {}, my message is {}",
35 CLUSTER_SELF_ID, message
36 )));
37 (cluster1, cluster2)
38}
39
40pub fn decouple_process<'a>(flow: &mut FlowBuilder<'a>) -> (Process<'a, ()>, Process<'a, ()>) {
41 let process1 = flow.process();
42 let process2 = flow.process();
43 process1
44 .source_iter(q!(0..3))
45 .decouple_process(&process2)
46 .for_each(q!(|message| println!("I received message is {}", message)));
47 (process1, process2)
48}
49
50pub fn simple_cluster<'a>(flow: &mut FlowBuilder<'a>) -> (Process<'a, ()>, Cluster<'a, ()>) {
51 let process = flow.process();
52 let cluster = flow.cluster();
53
54 let numbers = process.source_iter(q!(0..5));
55 let ids = process
56 .source_cluster_members(&cluster)
57 .entries()
58 .filter_map(q!(|(i, e)| match e {
59 MembershipEvent::Joined => Some(i),
60 MembershipEvent::Left => None,
61 }));
62
63 ids.cross_product(numbers.into())
64 .map(q!(|(id, n)| (id.clone(), (id, n))))
65 .demux(&cluster, TCP.fail_stop().bincode())
66 .inspect(q!(move |n| println!(
67 "cluster received: {:?} (self cluster id: {})",
68 n, CLUSTER_SELF_ID
69 )))
70 .send(&process, TCP.fail_stop().bincode())
71 .entries()
72 .assume_ordering::<TotalOrder>(nondet!())
73 .for_each(q!(|(id, d)| println!("node received: ({}, {:?})", id, d)));
74
75 (process, cluster)
76}
77
78#[cfg(test)]
79mod tests {
80 use hydro_deploy::Deployment;
81 use hydro_lang::deploy::DeployCrateWrapper;
82 use hydro_lang::location::MemberId;
83 use stageleft::q;
84
85 #[test]
86 fn simple_cluster_ir() {
87 let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
88 let _ = super::simple_cluster(&mut builder);
89 let built = builder.finalize();
90
91 hydro_build_utils::assert_debug_snapshot!(built.ir());
92 }
93
94 #[tokio::test]
95 async fn simple_cluster() {
96 let mut deployment = Deployment::new();
97
98 let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
99 let (node, cluster) = super::simple_cluster(&mut builder);
100
101 let nodes = builder
102 .with_default_optimize()
103 .with_process(&node, deployment.Localhost())
104 .with_cluster(&cluster, (0..2).map(|_| deployment.Localhost()))
105 .deploy(&mut deployment);
106
107 deployment.deploy().await.unwrap();
108
109 let mut node_stdout = nodes.get_process(&node).stdout();
110 let cluster_stdouts = nodes
111 .get_cluster(&cluster)
112 .members()
113 .iter()
114 .map(|node| node.stdout())
115 .collect::<Vec<_>>();
116
117 deployment.start().await.unwrap();
118
119 for (i, mut stdout) in cluster_stdouts.into_iter().enumerate() {
120 for j in 0..5 {
121 assert_eq!(
122 stdout.recv().await.unwrap(),
123 format!(
124 "cluster received: (MemberId::<()>({}), {}) (self cluster id: MemberId::<()>({}))",
125 i, j, i
126 )
127 );
128 }
129 }
130
131 let mut node_outs = vec![];
132 for _i in 0..10 {
133 node_outs.push(node_stdout.recv().await.unwrap());
134 }
135 node_outs.sort();
136
137 for (i, n) in node_outs.into_iter().enumerate() {
138 assert_eq!(
139 n,
140 format!(
141 "node received: (MemberId::<()>({}), (MemberId::<()>({}), {}))",
142 i / 5,
143 i / 5,
144 i % 5
145 )
146 );
147 }
148 }
149
150 #[tokio::test]
151 async fn decouple_process() {
152 let mut deployment = Deployment::new();
153
154 let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
155 let (process1, process2) = super::decouple_process(&mut builder);
156 let built = builder.with_default_optimize();
157
158 let nodes = built
159 .with_process(&process1, deployment.Localhost())
160 .with_process(&process2, deployment.Localhost())
161 .deploy(&mut deployment);
162
163 deployment.deploy().await.unwrap();
164 let mut process2_stdout = nodes.get_process(&process2).stdout();
165 deployment.start().await.unwrap();
166 for i in 0..3 {
167 let expected_message = format!("I received message is {}", i);
168 assert_eq!(process2_stdout.recv().await.unwrap(), expected_message);
169 }
170 }
171
172 #[tokio::test]
173 async fn decouple_cluster() {
174 let mut deployment = Deployment::new();
175
176 let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
177 let (cluster1, cluster2) = super::decouple_cluster(&mut builder);
178 let built = builder.with_default_optimize();
179
180 let nodes = built
181 .with_cluster(&cluster1, (0..3).map(|_| deployment.Localhost()))
182 .with_cluster(&cluster2, (0..3).map(|_| deployment.Localhost()))
183 .deploy(&mut deployment);
184
185 deployment.deploy().await.unwrap();
186
187 let cluster2_stdouts = nodes
188 .get_cluster(&cluster2)
189 .members()
190 .iter()
191 .map(|node| node.stdout())
192 .collect::<Vec<_>>();
193
194 deployment.start().await.unwrap();
195
196 for (i, mut stdout) in cluster2_stdouts.into_iter().enumerate() {
197 for _j in 0..1 {
198 let expected_message = format!(
199 "My self id is MemberId::<()>({}), my message is MemberId::<()>({})",
200 i, i
201 );
202 assert_eq!(stdout.recv().await.unwrap(), expected_message);
203 }
204 }
205 }
206
207 #[tokio::test]
208 async fn partition() {
209 let mut deployment = Deployment::new();
210
211 let num_nodes = 3;
212 let num_partitions = 2;
213 let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
214 let (cluster1, cluster2) = super::partition(
215 builder.cluster::<()>(),
216 builder.cluster::<()>(),
217 q!(move |(id, msg)| (
218 MemberId::<()>::from_raw_id(id.get_raw_id() * num_partitions as u32),
219 msg
220 )),
221 );
222 let built = builder.with_default_optimize();
223
224 let nodes = built
225 .with_cluster(&cluster1, (0..num_nodes).map(|_| deployment.Localhost()))
226 .with_cluster(
227 &cluster2,
228 (0..num_nodes * num_partitions).map(|_| deployment.Localhost()),
229 )
230 .deploy(&mut deployment);
231
232 deployment.deploy().await.unwrap();
233
234 let cluster2_stdouts = nodes
235 .get_cluster(&cluster2)
236 .members()
237 .iter()
238 .map(|node| node.stdout())
239 .collect::<Vec<_>>();
240
241 deployment.start().await.unwrap();
242
243 for (cluster2_id, mut stdout) in cluster2_stdouts.into_iter().enumerate() {
244 if cluster2_id % num_partitions == 0 {
245 let expected_message = format!(
246 r#"My self id is MemberId::<()>({}), my message is "Hello from MemberId::<()>({})""#,
247 cluster2_id,
248 cluster2_id / num_partitions
249 );
250 assert_eq!(stdout.recv().await.unwrap(), expected_message);
251 }
252 }
253 }
254}