hydro_test_local/local/
negation.rs

1use dfir_rs::tokio::sync::mpsc::UnboundedSender;
2use hydro_lang::deploy::SingleProcessGraph;
3use hydro_lang::dfir_rs::scheduled::graph::Dfir;
4use hydro_lang::*;
5use stageleft::{Quoted, RuntimeData};
6
7#[stageleft::entry]
8pub fn test_difference<'a>(
9    flow: FlowBuilder<'a>,
10    output: RuntimeData<&'a UnboundedSender<u32>>,
11    persist1: bool,
12    persist2: bool,
13) -> impl Quoted<'a, Dfir<'a>> {
14    let process = flow.process::<()>();
15    let tick = process.tick();
16
17    let mut source = unsafe {
18        // SAFETY: intentionally using ticks
19        process.source_iter(q!(0..5)).tick_batch(&tick)
20    };
21    if persist1 {
22        source = source.persist();
23    }
24
25    let mut source2 = unsafe {
26        // SAFETY: intentionally using ticks
27        process.source_iter(q!(3..6)).tick_batch(&tick)
28    };
29    if persist2 {
30        source2 = source2.persist();
31    }
32
33    source.filter_not_in(source2).all_ticks().for_each(q!(|v| {
34        output.send(v).unwrap();
35    }));
36
37    flow.compile_no_network::<SingleProcessGraph>()
38}
39
40#[stageleft::entry]
41pub fn test_anti_join<'a>(
42    flow: FlowBuilder<'a>,
43    output: RuntimeData<&'a UnboundedSender<u32>>,
44    persist1: bool,
45    persist2: bool,
46) -> impl Quoted<'a, Dfir<'a>> {
47    let process = flow.process::<()>();
48    let tick = process.tick();
49
50    let mut source = unsafe {
51        // SAFETY: intentionally using ticks
52        process
53            .source_iter(q!(0..5))
54            .map(q!(|v| (v, v)))
55            .tick_batch(&tick)
56    };
57    if persist1 {
58        source = source.persist();
59    }
60
61    let mut source2 = unsafe {
62        // SAFETY: intentionally using ticks
63        process.source_iter(q!(3..6)).tick_batch(&tick)
64    };
65    if persist2 {
66        source2 = source2.persist();
67    }
68
69    source.anti_join(source2).all_ticks().for_each(q!(|v| {
70        output.send(v.0).unwrap();
71    }));
72
73    flow.compile_no_network::<SingleProcessGraph>()
74}
75
76#[cfg(stageleft_runtime)]
77#[cfg(test)]
78mod tests {
79    use dfir_rs::assert_graphvis_snapshots;
80    use dfir_rs::util::collect_ready;
81
82    #[test]
83    fn test_difference_tick_tick() {
84        let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
85
86        let mut flow = super::test_difference!(&out, false, false);
87        assert_graphvis_snapshots!(flow);
88
89        flow.run_tick();
90
91        assert_eq!(&*collect_ready::<Vec<_>, _>(&mut out_recv), &[0, 1, 2]);
92
93        flow.run_tick();
94
95        assert_eq!(&*collect_ready::<Vec<_>, _>(&mut out_recv), &[] as &[u32]);
96    }
97
98    #[test]
99    fn test_difference_tick_static() {
100        let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
101
102        let mut flow = super::test_difference!(&out, false, true);
103        assert_graphvis_snapshots!(flow);
104
105        flow.run_tick();
106
107        assert_eq!(&*collect_ready::<Vec<_>, _>(&mut out_recv), &[0, 1, 2]);
108
109        flow.run_tick();
110
111        assert_eq!(&*collect_ready::<Vec<_>, _>(&mut out_recv), &[] as &[u32]);
112    }
113
114    #[test]
115    fn test_difference_static_tick() {
116        let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
117
118        let mut flow = super::test_difference!(&out, true, false);
119        assert_graphvis_snapshots!(flow);
120
121        flow.run_tick();
122
123        assert_eq!(&*collect_ready::<Vec<_>, _>(&mut out_recv), &[0, 1, 2]);
124
125        flow.run_tick();
126
127        assert_eq!(
128            &*collect_ready::<Vec<_>, _>(&mut out_recv),
129            &[0, 1, 2, 3, 4]
130        );
131    }
132
133    #[test]
134    fn test_difference_static_static() {
135        let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
136
137        let mut flow = super::test_difference!(&out, true, true);
138        assert_graphvis_snapshots!(flow);
139
140        flow.run_tick();
141
142        assert_eq!(&*collect_ready::<Vec<_>, _>(&mut out_recv), &[0, 1, 2]);
143
144        flow.run_tick();
145
146        assert_eq!(&*collect_ready::<Vec<_>, _>(&mut out_recv), &[0, 1, 2]);
147    }
148
149    #[test]
150    fn test_anti_join_tick_tick() {
151        let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
152
153        let mut flow = super::test_anti_join!(&out, false, false);
154        assert_graphvis_snapshots!(flow);
155
156        flow.run_tick();
157
158        assert_eq!(&*collect_ready::<Vec<_>, _>(&mut out_recv), &[0, 1, 2]);
159
160        flow.run_tick();
161
162        assert_eq!(&*collect_ready::<Vec<_>, _>(&mut out_recv), &[] as &[u32]);
163    }
164
165    #[test]
166    fn test_anti_join_tick_static() {
167        let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
168
169        let mut flow = super::test_anti_join!(&out, false, true);
170        assert_graphvis_snapshots!(flow);
171
172        flow.run_tick();
173
174        assert_eq!(&*collect_ready::<Vec<_>, _>(&mut out_recv), &[0, 1, 2]);
175
176        flow.run_tick();
177
178        assert_eq!(&*collect_ready::<Vec<_>, _>(&mut out_recv), &[] as &[u32]);
179    }
180
181    #[test]
182    fn test_anti_join_static_tick() {
183        let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
184
185        let mut flow = super::test_anti_join!(&out, true, false);
186        assert_graphvis_snapshots!(flow);
187
188        flow.run_tick();
189
190        assert_eq!(&*collect_ready::<Vec<_>, _>(&mut out_recv), &[0, 1, 2]);
191
192        flow.run_tick();
193
194        assert_eq!(
195            &*collect_ready::<Vec<_>, _>(&mut out_recv),
196            &[0, 1, 2, 3, 4]
197        );
198    }
199
200    #[test]
201    fn test_anti_join_static_static() {
202        let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
203
204        let mut flow = super::test_anti_join!(&out, true, true);
205        assert_graphvis_snapshots!(flow);
206
207        flow.run_tick();
208
209        assert_eq!(&*collect_ready::<Vec<_>, _>(&mut out_recv), &[0, 1, 2]);
210
211        flow.run_tick();
212
213        assert_eq!(&*collect_ready::<Vec<_>, _>(&mut out_recv), &[0, 1, 2]);
214    }
215}