hydro_test_local_macro/local/
negation.rs
1use dfir_rs::tokio::sync::mpsc::UnboundedSender;
2use hydro_lang::deploy::SingleProcessGraph;
3use hydro_lang::dfir_rs::scheduled::graph::Dfir;
4use hydro_lang::*;
5use stageleft::{Quoted, RuntimeData};
6
7#[stageleft::entry]
8pub fn test_difference<'a>(
9 flow: FlowBuilder<'a>,
10 output: RuntimeData<&'a UnboundedSender<u32>>,
11 persist1: bool,
12 persist2: bool,
13) -> impl Quoted<'a, Dfir<'a>> {
14 let process = flow.process::<()>();
15 let tick = process.tick();
16
17 let mut source = unsafe {
18 process.source_iter(q!(0..5)).tick_batch(&tick)
20 };
21 if persist1 {
22 source = source.persist();
23 }
24
25 let mut source2 = unsafe {
26 process.source_iter(q!(3..6)).tick_batch(&tick)
28 };
29 if persist2 {
30 source2 = source2.persist();
31 }
32
33 source.filter_not_in(source2).all_ticks().for_each(q!(|v| {
34 output.send(v).unwrap();
35 }));
36
37 flow.compile_no_network::<SingleProcessGraph>()
38}
39
40#[stageleft::entry]
41pub fn test_anti_join<'a>(
42 flow: FlowBuilder<'a>,
43 output: RuntimeData<&'a UnboundedSender<u32>>,
44 persist1: bool,
45 persist2: bool,
46) -> impl Quoted<'a, Dfir<'a>> {
47 let process = flow.process::<()>();
48 let tick = process.tick();
49
50 let mut source = unsafe {
51 process
53 .source_iter(q!(0..5))
54 .map(q!(|v| (v, v)))
55 .tick_batch(&tick)
56 };
57 if persist1 {
58 source = source.persist();
59 }
60
61 let mut source2 = unsafe {
62 process.source_iter(q!(3..6)).tick_batch(&tick)
64 };
65 if persist2 {
66 source2 = source2.persist();
67 }
68
69 source.anti_join(source2).all_ticks().for_each(q!(|v| {
70 output.send(v.0).unwrap();
71 }));
72
73 flow.compile_no_network::<SingleProcessGraph>()
74}
75
76#[cfg(stageleft_runtime)]
77#[cfg(test)]
78mod tests {
79 use dfir_rs::assert_graphvis_snapshots;
80 use dfir_rs::util::collect_ready;
81
82 #[test]
83 fn test_difference_tick_tick() {
84 let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
85
86 let mut flow = super::test_difference!(&out, false, false);
87 assert_graphvis_snapshots!(flow);
88
89 flow.run_tick();
90
91 assert_eq!(&*collect_ready::<Vec<_>, _>(&mut out_recv), &[0, 1, 2]);
92
93 flow.run_tick();
94
95 assert_eq!(&*collect_ready::<Vec<_>, _>(&mut out_recv), &[] as &[u32]);
96 }
97
98 #[test]
99 fn test_difference_tick_static() {
100 let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
101
102 let mut flow = super::test_difference!(&out, false, true);
103 assert_graphvis_snapshots!(flow);
104
105 flow.run_tick();
106
107 assert_eq!(&*collect_ready::<Vec<_>, _>(&mut out_recv), &[0, 1, 2]);
108
109 flow.run_tick();
110
111 assert_eq!(&*collect_ready::<Vec<_>, _>(&mut out_recv), &[] as &[u32]);
112 }
113
114 #[test]
115 fn test_difference_static_tick() {
116 let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
117
118 let mut flow = super::test_difference!(&out, true, false);
119 assert_graphvis_snapshots!(flow);
120
121 flow.run_tick();
122
123 assert_eq!(&*collect_ready::<Vec<_>, _>(&mut out_recv), &[0, 1, 2]);
124
125 flow.run_tick();
126
127 assert_eq!(
128 &*collect_ready::<Vec<_>, _>(&mut out_recv),
129 &[0, 1, 2, 3, 4]
130 );
131 }
132
133 #[test]
134 fn test_difference_static_static() {
135 let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
136
137 let mut flow = super::test_difference!(&out, true, true);
138 assert_graphvis_snapshots!(flow);
139
140 flow.run_tick();
141
142 assert_eq!(&*collect_ready::<Vec<_>, _>(&mut out_recv), &[0, 1, 2]);
143
144 flow.run_tick();
145
146 assert_eq!(&*collect_ready::<Vec<_>, _>(&mut out_recv), &[0, 1, 2]);
147 }
148
149 #[test]
150 fn test_anti_join_tick_tick() {
151 let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
152
153 let mut flow = super::test_anti_join!(&out, false, false);
154 assert_graphvis_snapshots!(flow);
155
156 flow.run_tick();
157
158 assert_eq!(&*collect_ready::<Vec<_>, _>(&mut out_recv), &[0, 1, 2]);
159
160 flow.run_tick();
161
162 assert_eq!(&*collect_ready::<Vec<_>, _>(&mut out_recv), &[] as &[u32]);
163 }
164
165 #[test]
166 fn test_anti_join_tick_static() {
167 let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
168
169 let mut flow = super::test_anti_join!(&out, false, true);
170 assert_graphvis_snapshots!(flow);
171
172 flow.run_tick();
173
174 assert_eq!(&*collect_ready::<Vec<_>, _>(&mut out_recv), &[0, 1, 2]);
175
176 flow.run_tick();
177
178 assert_eq!(&*collect_ready::<Vec<_>, _>(&mut out_recv), &[] as &[u32]);
179 }
180
181 #[test]
182 fn test_anti_join_static_tick() {
183 let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
184
185 let mut flow = super::test_anti_join!(&out, true, false);
186 assert_graphvis_snapshots!(flow);
187
188 flow.run_tick();
189
190 assert_eq!(&*collect_ready::<Vec<_>, _>(&mut out_recv), &[0, 1, 2]);
191
192 flow.run_tick();
193
194 assert_eq!(
195 &*collect_ready::<Vec<_>, _>(&mut out_recv),
196 &[0, 1, 2, 3, 4]
197 );
198 }
199
200 #[test]
201 fn test_anti_join_static_static() {
202 let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
203
204 let mut flow = super::test_anti_join!(&out, true, true);
205 assert_graphvis_snapshots!(flow);
206
207 flow.run_tick();
208
209 assert_eq!(&*collect_ready::<Vec<_>, _>(&mut out_recv), &[0, 1, 2]);
210
211 flow.run_tick();
212
213 assert_eq!(&*collect_ready::<Vec<_>, _>(&mut out_recv), &[0, 1, 2]);
214 }
215}