hydro_test/local/
negation.rs

1use hydro_lang::*;
2
3pub fn test_difference<'a>(
4    process: &Process<'a>,
5    persist1: bool,
6    persist2: bool,
7    tick_trigger: Stream<(), Process<'a>, Unbounded>,
8) -> Stream<u32, Process<'a>, Unbounded> {
9    let tick = process.tick();
10
11    let mut source = process
12        .source_iter(q!(0..5))
13        .batch(&tick, nondet!(/** test */))
14        .continue_if(
15            tick_trigger
16                .clone()
17                .batch(&tick, nondet!(/** test */))
18                .first(),
19        );
20    if persist1 {
21        source = source.persist();
22    }
23
24    let mut source2 = process
25        .source_iter(q!(3..6))
26        .batch(&tick, nondet!(/** test */))
27        .continue_if(
28            tick_trigger
29                .clone()
30                .batch(&tick, nondet!(/** test */))
31                .first(),
32        );
33    if persist2 {
34        source2 = source2.persist();
35    }
36
37    source
38        .filter_not_in(source2)
39        .continue_if(tick_trigger.batch(&tick, nondet!(/** test */)).first())
40        .all_ticks()
41}
42
43pub fn test_anti_join<'a>(
44    process: &Process<'a>,
45    persist1: bool,
46    persist2: bool,
47    tick_trigger: Stream<(), Process<'a>, Unbounded>,
48) -> Stream<u32, Process<'a>, Unbounded> {
49    let tick = process.tick();
50
51    let mut source = process
52        .source_iter(q!(0..5))
53        .map(q!(|v| (v, v)))
54        .batch(&tick, nondet!(/** test */))
55        .continue_if(
56            tick_trigger
57                .clone()
58                .batch(&tick, nondet!(/** test */))
59                .first(),
60        );
61    if persist1 {
62        source = source.persist();
63    }
64
65    let mut source2 = process
66        .source_iter(q!(3..6))
67        .batch(&tick, nondet!(/** test */))
68        .continue_if(
69            tick_trigger
70                .clone()
71                .batch(&tick, nondet!(/** test */))
72                .first(),
73        );
74    if persist2 {
75        source2 = source2.persist();
76    }
77
78    source
79        .anti_join(source2)
80        .continue_if(tick_trigger.batch(&tick, nondet!(/** test */)).first())
81        .all_ticks()
82        .map(q!(|v| v.0))
83}
84
85#[cfg(test)]
86mod tests {
87    use futures::{SinkExt, Stream, StreamExt};
88    use hydro_deploy::Deployment;
89    use hydro_lang::Location;
90
91    async fn take_next_n<T>(stream: &mut (impl Stream<Item = T> + Unpin), n: usize) -> Vec<T> {
92        let mut out = Vec::with_capacity(n);
93        for _ in 0..n {
94            if let Some(item) = stream.next().await {
95                out.push(item);
96            } else {
97                panic!();
98            }
99        }
100        out
101    }
102
103    #[tokio::test]
104    async fn test_difference_tick_tick() {
105        let mut deployment = Deployment::new();
106
107        let builder = hydro_lang::FlowBuilder::new();
108        let external = builder.external::<()>();
109        let p1 = builder.process();
110
111        let (tick_send, tick_trigger) = p1.source_external_bincode(&external);
112
113        let out = super::test_difference(&p1, false, false, tick_trigger);
114        let out_recv = out.send_bincode_external(&external);
115
116        let built = builder.with_default_optimize();
117        let nodes = built
118            .with_process(&p1, deployment.Localhost())
119            .with_external(&external, deployment.Localhost())
120            .deploy(&mut deployment);
121
122        deployment.deploy().await.unwrap();
123
124        let mut tick_send = nodes.connect_sink_bincode(tick_send).await;
125        let out_recv = nodes.connect_source_bincode(out_recv).await;
126
127        tick_send.send(()).await.unwrap();
128
129        deployment.start().await.unwrap();
130
131        assert_eq!(out_recv.take(3).collect::<Vec<_>>().await, &[0, 1, 2]);
132    }
133
134    #[tokio::test]
135    async fn test_difference_tick_static() {
136        let mut deployment = Deployment::new();
137
138        let builder = hydro_lang::FlowBuilder::new();
139        let external = builder.external::<()>();
140        let p1 = builder.process();
141
142        let (tick_send, tick_trigger) = p1.source_external_bincode(&external);
143
144        let out = super::test_difference(&p1, false, true, tick_trigger);
145        let out_recv = out.send_bincode_external(&external);
146
147        let built = builder.with_default_optimize();
148        let nodes = built
149            .with_process(&p1, deployment.Localhost())
150            .with_external(&external, deployment.Localhost())
151            .deploy(&mut deployment);
152
153        deployment.deploy().await.unwrap();
154
155        let mut tick_send = nodes.connect_sink_bincode(tick_send).await;
156        let out_recv = nodes.connect_source_bincode(out_recv).await;
157
158        tick_send.send(()).await.unwrap();
159
160        deployment.start().await.unwrap();
161
162        assert_eq!(out_recv.take(3).collect::<Vec<_>>().await, &[0, 1, 2]);
163    }
164
165    #[tokio::test]
166    async fn test_difference_static_tick() {
167        let mut deployment = Deployment::new();
168
169        let builder = hydro_lang::FlowBuilder::new();
170        let external = builder.external::<()>();
171        let p1 = builder.process();
172
173        let (tick_send, tick_trigger) = p1.source_external_bincode(&external);
174
175        let out = super::test_difference(&p1, true, false, tick_trigger);
176        let out_recv = out.send_bincode_external(&external);
177
178        let built = builder.with_default_optimize();
179        let nodes = built
180            .with_process(&p1, deployment.Localhost())
181            .with_external(&external, deployment.Localhost())
182            .deploy(&mut deployment);
183
184        deployment.deploy().await.unwrap();
185
186        let mut tick_send = nodes.connect_sink_bincode(tick_send).await;
187        let mut out_recv = nodes.connect_source_bincode(out_recv).await;
188
189        tick_send.send(()).await.unwrap();
190
191        deployment.start().await.unwrap();
192
193        assert_eq!(take_next_n(&mut out_recv, 3).await, &[0, 1, 2]);
194
195        tick_send.send(()).await.unwrap();
196
197        assert_eq!(take_next_n(&mut out_recv, 5).await, &[0, 1, 2, 3, 4]);
198    }
199
200    #[tokio::test]
201    async fn test_difference_static_static() {
202        let mut deployment = Deployment::new();
203
204        let builder = hydro_lang::FlowBuilder::new();
205        let external = builder.external::<()>();
206        let p1 = builder.process();
207
208        let (tick_send, tick_trigger) = p1.source_external_bincode(&external);
209
210        let out = super::test_difference(&p1, true, true, tick_trigger);
211        let out_recv = out.send_bincode_external(&external);
212
213        let built = builder.with_default_optimize();
214        let nodes = built
215            .with_process(&p1, deployment.Localhost())
216            .with_external(&external, deployment.Localhost())
217            .deploy(&mut deployment);
218
219        deployment.deploy().await.unwrap();
220
221        let mut tick_send = nodes.connect_sink_bincode(tick_send).await;
222        let mut out_recv = nodes.connect_source_bincode(out_recv).await;
223
224        tick_send.send(()).await.unwrap();
225
226        deployment.start().await.unwrap();
227
228        assert_eq!(take_next_n(&mut out_recv, 3).await, &[0, 1, 2]);
229
230        tick_send.send(()).await.unwrap();
231
232        assert_eq!(take_next_n(&mut out_recv, 3).await, &[0, 1, 2]);
233    }
234
235    #[tokio::test]
236    async fn test_anti_join_tick_tick() {
237        let mut deployment = Deployment::new();
238
239        let builder = hydro_lang::FlowBuilder::new();
240        let external = builder.external::<()>();
241        let p1 = builder.process();
242
243        let (tick_send, tick_trigger) = p1.source_external_bincode(&external);
244
245        let out = super::test_anti_join(&p1, false, false, tick_trigger);
246        let out_recv = out.send_bincode_external(&external);
247
248        let built = builder.with_default_optimize();
249        let nodes = built
250            .with_process(&p1, deployment.Localhost())
251            .with_external(&external, deployment.Localhost())
252            .deploy(&mut deployment);
253
254        deployment.deploy().await.unwrap();
255
256        let mut tick_send = nodes.connect_sink_bincode(tick_send).await;
257        let out_recv = nodes.connect_source_bincode(out_recv).await;
258
259        tick_send.send(()).await.unwrap();
260
261        deployment.start().await.unwrap();
262
263        assert_eq!(out_recv.take(3).collect::<Vec<_>>().await, &[0, 1, 2]);
264    }
265
266    #[tokio::test]
267    async fn test_anti_join_tick_static() {
268        let mut deployment = Deployment::new();
269
270        let builder = hydro_lang::FlowBuilder::new();
271        let external = builder.external::<()>();
272        let p1 = builder.process();
273
274        let (tick_send, tick_trigger) = p1.source_external_bincode(&external);
275
276        let out = super::test_anti_join(&p1, false, true, tick_trigger);
277        let out_recv = out.send_bincode_external(&external);
278
279        let built = builder.with_default_optimize();
280        let nodes = built
281            .with_process(&p1, deployment.Localhost())
282            .with_external(&external, deployment.Localhost())
283            .deploy(&mut deployment);
284
285        deployment.deploy().await.unwrap();
286
287        let mut tick_send = nodes.connect_sink_bincode(tick_send).await;
288        let out_recv = nodes.connect_source_bincode(out_recv).await;
289
290        tick_send.send(()).await.unwrap();
291
292        deployment.start().await.unwrap();
293
294        assert_eq!(out_recv.take(3).collect::<Vec<_>>().await, &[0, 1, 2]);
295    }
296
297    #[tokio::test]
298    async fn test_anti_join_static_tick() {
299        let mut deployment = Deployment::new();
300
301        let builder = hydro_lang::FlowBuilder::new();
302        let external = builder.external::<()>();
303        let p1 = builder.process();
304
305        let (tick_send, tick_trigger) = p1.source_external_bincode(&external);
306
307        let out = super::test_anti_join(&p1, true, false, tick_trigger);
308        let out_recv = out.send_bincode_external(&external);
309
310        let built = builder.with_default_optimize();
311        let nodes = built
312            .with_process(&p1, deployment.Localhost())
313            .with_external(&external, deployment.Localhost())
314            .deploy(&mut deployment);
315
316        deployment.deploy().await.unwrap();
317
318        let mut tick_send = nodes.connect_sink_bincode(tick_send).await;
319        let mut out_recv = nodes.connect_source_bincode(out_recv).await;
320
321        tick_send.send(()).await.unwrap();
322
323        deployment.start().await.unwrap();
324
325        assert_eq!(take_next_n(&mut out_recv, 3).await, &[0, 1, 2]);
326
327        tick_send.send(()).await.unwrap();
328
329        assert_eq!(take_next_n(&mut out_recv, 5).await, &[0, 1, 2, 3, 4]);
330    }
331
332    #[tokio::test]
333    async fn test_anti_join_static_static() {
334        let mut deployment = Deployment::new();
335
336        let builder = hydro_lang::FlowBuilder::new();
337        let external = builder.external::<()>();
338        let p1 = builder.process();
339
340        let (tick_send, tick_trigger) = p1.source_external_bincode(&external);
341
342        let out = super::test_anti_join(&p1, true, true, tick_trigger);
343        let out_recv = out.send_bincode_external(&external);
344
345        let built = builder.with_default_optimize();
346        let nodes = built
347            .with_process(&p1, deployment.Localhost())
348            .with_external(&external, deployment.Localhost())
349            .deploy(&mut deployment);
350
351        deployment.deploy().await.unwrap();
352
353        let mut tick_send = nodes.connect_sink_bincode(tick_send).await;
354        let mut out_recv = nodes.connect_source_bincode(out_recv).await;
355
356        tick_send.send(()).await.unwrap();
357
358        deployment.start().await.unwrap();
359
360        assert_eq!(take_next_n(&mut out_recv, 3).await, &[0, 1, 2]);
361
362        tick_send.send(()).await.unwrap();
363
364        assert_eq!(take_next_n(&mut out_recv, 3).await, &[0, 1, 2]);
365    }
366}