1use hydro_lang::*;
2
3pub fn test_difference<'a>(
4 process: &Process<'a>,
5 persist1: bool,
6 persist2: bool,
7 tick_trigger: Stream<(), Process<'a>, Unbounded>,
8) -> Stream<u32, Process<'a>, Unbounded> {
9 let tick = process.tick();
10
11 let mut source = process
12 .source_iter(q!(0..5))
13 .batch(&tick, nondet!())
14 .continue_if(
15 tick_trigger
16 .clone()
17 .batch(&tick, nondet!())
18 .first(),
19 );
20 if persist1 {
21 source = source.persist();
22 }
23
24 let mut source2 = process
25 .source_iter(q!(3..6))
26 .batch(&tick, nondet!())
27 .continue_if(
28 tick_trigger
29 .clone()
30 .batch(&tick, nondet!())
31 .first(),
32 );
33 if persist2 {
34 source2 = source2.persist();
35 }
36
37 source
38 .filter_not_in(source2)
39 .continue_if(tick_trigger.batch(&tick, nondet!()).first())
40 .all_ticks()
41}
42
43pub fn test_anti_join<'a>(
44 process: &Process<'a>,
45 persist1: bool,
46 persist2: bool,
47 tick_trigger: Stream<(), Process<'a>, Unbounded>,
48) -> Stream<u32, Process<'a>, Unbounded> {
49 let tick = process.tick();
50
51 let mut source = process
52 .source_iter(q!(0..5))
53 .map(q!(|v| (v, v)))
54 .batch(&tick, nondet!())
55 .continue_if(
56 tick_trigger
57 .clone()
58 .batch(&tick, nondet!())
59 .first(),
60 );
61 if persist1 {
62 source = source.persist();
63 }
64
65 let mut source2 = process
66 .source_iter(q!(3..6))
67 .batch(&tick, nondet!())
68 .continue_if(
69 tick_trigger
70 .clone()
71 .batch(&tick, nondet!())
72 .first(),
73 );
74 if persist2 {
75 source2 = source2.persist();
76 }
77
78 source
79 .anti_join(source2)
80 .continue_if(tick_trigger.batch(&tick, nondet!()).first())
81 .all_ticks()
82 .map(q!(|v| v.0))
83}
84
85#[cfg(test)]
86mod tests {
87 use futures::{SinkExt, Stream, StreamExt};
88 use hydro_deploy::Deployment;
89 use hydro_lang::Location;
90
91 async fn take_next_n<T>(stream: &mut (impl Stream<Item = T> + Unpin), n: usize) -> Vec<T> {
92 let mut out = Vec::with_capacity(n);
93 for _ in 0..n {
94 if let Some(item) = stream.next().await {
95 out.push(item);
96 } else {
97 panic!();
98 }
99 }
100 out
101 }
102
103 #[tokio::test]
104 async fn test_difference_tick_tick() {
105 let mut deployment = Deployment::new();
106
107 let builder = hydro_lang::FlowBuilder::new();
108 let external = builder.external::<()>();
109 let p1 = builder.process();
110
111 let (tick_send, tick_trigger) = p1.source_external_bincode(&external);
112
113 let out = super::test_difference(&p1, false, false, tick_trigger);
114 let out_recv = out.send_bincode_external(&external);
115
116 let built = builder.with_default_optimize();
117 let nodes = built
118 .with_process(&p1, deployment.Localhost())
119 .with_external(&external, deployment.Localhost())
120 .deploy(&mut deployment);
121
122 deployment.deploy().await.unwrap();
123
124 let mut tick_send = nodes.connect_sink_bincode(tick_send).await;
125 let out_recv = nodes.connect_source_bincode(out_recv).await;
126
127 tick_send.send(()).await.unwrap();
128
129 deployment.start().await.unwrap();
130
131 assert_eq!(out_recv.take(3).collect::<Vec<_>>().await, &[0, 1, 2]);
132 }
133
134 #[tokio::test]
135 async fn test_difference_tick_static() {
136 let mut deployment = Deployment::new();
137
138 let builder = hydro_lang::FlowBuilder::new();
139 let external = builder.external::<()>();
140 let p1 = builder.process();
141
142 let (tick_send, tick_trigger) = p1.source_external_bincode(&external);
143
144 let out = super::test_difference(&p1, false, true, tick_trigger);
145 let out_recv = out.send_bincode_external(&external);
146
147 let built = builder.with_default_optimize();
148 let nodes = built
149 .with_process(&p1, deployment.Localhost())
150 .with_external(&external, deployment.Localhost())
151 .deploy(&mut deployment);
152
153 deployment.deploy().await.unwrap();
154
155 let mut tick_send = nodes.connect_sink_bincode(tick_send).await;
156 let out_recv = nodes.connect_source_bincode(out_recv).await;
157
158 tick_send.send(()).await.unwrap();
159
160 deployment.start().await.unwrap();
161
162 assert_eq!(out_recv.take(3).collect::<Vec<_>>().await, &[0, 1, 2]);
163 }
164
165 #[tokio::test]
166 async fn test_difference_static_tick() {
167 let mut deployment = Deployment::new();
168
169 let builder = hydro_lang::FlowBuilder::new();
170 let external = builder.external::<()>();
171 let p1 = builder.process();
172
173 let (tick_send, tick_trigger) = p1.source_external_bincode(&external);
174
175 let out = super::test_difference(&p1, true, false, tick_trigger);
176 let out_recv = out.send_bincode_external(&external);
177
178 let built = builder.with_default_optimize();
179 let nodes = built
180 .with_process(&p1, deployment.Localhost())
181 .with_external(&external, deployment.Localhost())
182 .deploy(&mut deployment);
183
184 deployment.deploy().await.unwrap();
185
186 let mut tick_send = nodes.connect_sink_bincode(tick_send).await;
187 let mut out_recv = nodes.connect_source_bincode(out_recv).await;
188
189 tick_send.send(()).await.unwrap();
190
191 deployment.start().await.unwrap();
192
193 assert_eq!(take_next_n(&mut out_recv, 3).await, &[0, 1, 2]);
194
195 tick_send.send(()).await.unwrap();
196
197 assert_eq!(take_next_n(&mut out_recv, 5).await, &[0, 1, 2, 3, 4]);
198 }
199
200 #[tokio::test]
201 async fn test_difference_static_static() {
202 let mut deployment = Deployment::new();
203
204 let builder = hydro_lang::FlowBuilder::new();
205 let external = builder.external::<()>();
206 let p1 = builder.process();
207
208 let (tick_send, tick_trigger) = p1.source_external_bincode(&external);
209
210 let out = super::test_difference(&p1, true, true, tick_trigger);
211 let out_recv = out.send_bincode_external(&external);
212
213 let built = builder.with_default_optimize();
214 let nodes = built
215 .with_process(&p1, deployment.Localhost())
216 .with_external(&external, deployment.Localhost())
217 .deploy(&mut deployment);
218
219 deployment.deploy().await.unwrap();
220
221 let mut tick_send = nodes.connect_sink_bincode(tick_send).await;
222 let mut out_recv = nodes.connect_source_bincode(out_recv).await;
223
224 tick_send.send(()).await.unwrap();
225
226 deployment.start().await.unwrap();
227
228 assert_eq!(take_next_n(&mut out_recv, 3).await, &[0, 1, 2]);
229
230 tick_send.send(()).await.unwrap();
231
232 assert_eq!(take_next_n(&mut out_recv, 3).await, &[0, 1, 2]);
233 }
234
235 #[tokio::test]
236 async fn test_anti_join_tick_tick() {
237 let mut deployment = Deployment::new();
238
239 let builder = hydro_lang::FlowBuilder::new();
240 let external = builder.external::<()>();
241 let p1 = builder.process();
242
243 let (tick_send, tick_trigger) = p1.source_external_bincode(&external);
244
245 let out = super::test_anti_join(&p1, false, false, tick_trigger);
246 let out_recv = out.send_bincode_external(&external);
247
248 let built = builder.with_default_optimize();
249 let nodes = built
250 .with_process(&p1, deployment.Localhost())
251 .with_external(&external, deployment.Localhost())
252 .deploy(&mut deployment);
253
254 deployment.deploy().await.unwrap();
255
256 let mut tick_send = nodes.connect_sink_bincode(tick_send).await;
257 let out_recv = nodes.connect_source_bincode(out_recv).await;
258
259 tick_send.send(()).await.unwrap();
260
261 deployment.start().await.unwrap();
262
263 assert_eq!(out_recv.take(3).collect::<Vec<_>>().await, &[0, 1, 2]);
264 }
265
266 #[tokio::test]
267 async fn test_anti_join_tick_static() {
268 let mut deployment = Deployment::new();
269
270 let builder = hydro_lang::FlowBuilder::new();
271 let external = builder.external::<()>();
272 let p1 = builder.process();
273
274 let (tick_send, tick_trigger) = p1.source_external_bincode(&external);
275
276 let out = super::test_anti_join(&p1, false, true, tick_trigger);
277 let out_recv = out.send_bincode_external(&external);
278
279 let built = builder.with_default_optimize();
280 let nodes = built
281 .with_process(&p1, deployment.Localhost())
282 .with_external(&external, deployment.Localhost())
283 .deploy(&mut deployment);
284
285 deployment.deploy().await.unwrap();
286
287 let mut tick_send = nodes.connect_sink_bincode(tick_send).await;
288 let out_recv = nodes.connect_source_bincode(out_recv).await;
289
290 tick_send.send(()).await.unwrap();
291
292 deployment.start().await.unwrap();
293
294 assert_eq!(out_recv.take(3).collect::<Vec<_>>().await, &[0, 1, 2]);
295 }
296
297 #[tokio::test]
298 async fn test_anti_join_static_tick() {
299 let mut deployment = Deployment::new();
300
301 let builder = hydro_lang::FlowBuilder::new();
302 let external = builder.external::<()>();
303 let p1 = builder.process();
304
305 let (tick_send, tick_trigger) = p1.source_external_bincode(&external);
306
307 let out = super::test_anti_join(&p1, true, false, tick_trigger);
308 let out_recv = out.send_bincode_external(&external);
309
310 let built = builder.with_default_optimize();
311 let nodes = built
312 .with_process(&p1, deployment.Localhost())
313 .with_external(&external, deployment.Localhost())
314 .deploy(&mut deployment);
315
316 deployment.deploy().await.unwrap();
317
318 let mut tick_send = nodes.connect_sink_bincode(tick_send).await;
319 let mut out_recv = nodes.connect_source_bincode(out_recv).await;
320
321 tick_send.send(()).await.unwrap();
322
323 deployment.start().await.unwrap();
324
325 assert_eq!(take_next_n(&mut out_recv, 3).await, &[0, 1, 2]);
326
327 tick_send.send(()).await.unwrap();
328
329 assert_eq!(take_next_n(&mut out_recv, 5).await, &[0, 1, 2, 3, 4]);
330 }
331
332 #[tokio::test]
333 async fn test_anti_join_static_static() {
334 let mut deployment = Deployment::new();
335
336 let builder = hydro_lang::FlowBuilder::new();
337 let external = builder.external::<()>();
338 let p1 = builder.process();
339
340 let (tick_send, tick_trigger) = p1.source_external_bincode(&external);
341
342 let out = super::test_anti_join(&p1, true, true, tick_trigger);
343 let out_recv = out.send_bincode_external(&external);
344
345 let built = builder.with_default_optimize();
346 let nodes = built
347 .with_process(&p1, deployment.Localhost())
348 .with_external(&external, deployment.Localhost())
349 .deploy(&mut deployment);
350
351 deployment.deploy().await.unwrap();
352
353 let mut tick_send = nodes.connect_sink_bincode(tick_send).await;
354 let mut out_recv = nodes.connect_source_bincode(out_recv).await;
355
356 tick_send.send(()).await.unwrap();
357
358 deployment.start().await.unwrap();
359
360 assert_eq!(take_next_n(&mut out_recv, 3).await, &[0, 1, 2]);
361
362 tick_send.send(()).await.unwrap();
363
364 assert_eq!(take_next_n(&mut out_recv, 3).await, &[0, 1, 2]);
365 }
366}