1#[cfg(feature = "demux")]
10pub mod demux;
11pub mod filter;
12pub mod filter_map;
13pub mod flatten;
14pub mod for_each;
15pub mod inspect;
16pub mod map;
17pub mod null;
18pub mod partition;
19pub mod pivot;
20pub mod switch;
21pub mod tee;
22pub mod unzip;
23
24use std::marker::PhantomData;
25
26use either::Either;
27
28pub trait Pusherator: Sized {
29 type Item;
30 fn give(&mut self, item: Self::Item);
31}
32
33pub trait IteratorToPusherator: Iterator {
34 fn pull_to_push(self) -> pivot::PivotBuild<Self>
35 where
36 Self: Sized,
37 {
38 pivot::PivotBuild::new(self)
39 }
40}
41impl<I> IteratorToPusherator for I where I: Sized + Iterator {}
42
43pub trait PusheratorBuild {
44 type ItemOut;
45
46 type Output<Next: Pusherator<Item = Self::ItemOut>>;
47 fn push_to<Next>(self, input: Next) -> Self::Output<Next>
48 where
49 Next: Pusherator<Item = Self::ItemOut>;
50
51 fn map<Func, Out>(self, func: Func) -> map::MapBuild<Self, Func>
52 where
53 Self: Sized,
54 Func: FnMut(Self::ItemOut) -> Out,
55 {
56 map::MapBuild::new(self, func)
57 }
58
59 fn inspect<Func>(self, func: Func) -> inspect::InspectBuild<Self, Func>
60 where
61 Self: Sized,
62 Func: FnMut(&Self::ItemOut),
63 {
64 inspect::InspectBuild::new(self, func)
65 }
66
67 fn filter<Func>(self, func: Func) -> filter::FilterBuild<Self, Func>
68 where
69 Self: Sized,
70 Func: FnMut(&Self::ItemOut) -> bool,
71 {
72 filter::FilterBuild::new(self, func)
73 }
74
75 fn tee<Next1>(self, next1: Next1) -> tee::TeeBuild<Self, Next1>
76 where
77 Self: Sized,
78 Self::ItemOut: Clone,
79 Next1: Pusherator<Item = Self::ItemOut>,
80 {
81 tee::TeeBuild::new(self, next1)
82 }
83
84 fn unzip<Next1, Item2>(self, next1: Next1) -> unzip::UnzipBuild<Self, Next1>
85 where
86 Self: Sized,
87 Self: PusheratorBuild<ItemOut = (Next1::Item, Item2)>,
88 Next1: Pusherator,
89 {
90 unzip::UnzipBuild::new(self, next1)
91 }
92
93 fn switch<Next1, Item2>(self, next1: Next1) -> switch::SwitchBuild<Self, Next1>
94 where
95 Self: Sized,
96 Self: PusheratorBuild<ItemOut = Either<Next1::Item, Item2>>,
97 Next1: Pusherator,
98 {
99 switch::SwitchBuild::new(self, next1)
100 }
101
102 fn for_each<Func>(self, func: Func) -> Self::Output<for_each::ForEach<Func, Self::ItemOut>>
103 where
104 Self: Sized,
105 Func: FnMut(Self::ItemOut),
106 {
107 self.push_to(for_each::ForEach::new(func))
108 }
109
110 #[cfg(feature = "demux")]
111 fn demux<Func, Nexts>(
112 self,
113 func: Func,
114 nexts: Nexts,
115 ) -> Self::Output<demux::Demux<Func, Nexts, Self::ItemOut>>
116 where
117 Self: Sized,
118 Func: FnMut(Self::ItemOut, &mut Nexts),
119 {
120 self.push_to(demux::Demux::new(func, nexts))
121 }
122}
123
124pub struct InputBuild<T>(PhantomData<T>);
125impl<T> Default for InputBuild<T> {
126 fn default() -> Self {
127 Self(PhantomData)
128 }
129}
130impl<T> InputBuild<T> {
131 pub fn new() -> Self {
132 Default::default()
133 }
134}
135impl<T> PusheratorBuild for InputBuild<T> {
136 type ItemOut = T;
137
138 type Output<O: Pusherator<Item = Self::ItemOut>> = O;
139 fn push_to<O>(self, input: O) -> Self::Output<O>
140 where
141 O: Pusherator<Item = Self::ItemOut>,
142 {
143 input
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use std::rc::Rc;
150
151 use super::Pusherator;
152 use super::filter::Filter;
153 use super::for_each::ForEach;
154 use super::map::Map;
155 use super::partition::Partition;
156 use super::pivot::Pivot;
157 use super::tee::Tee;
158
159 #[test]
160 fn linear_chains() {
161 let mut v = Vec::new();
162 let mut pusher = Map::new(
163 |x| x * 2,
164 Filter::new(|x| *x > 5, ForEach::new(|x| v.push(x))),
165 );
166
167 for i in 0..5 {
168 pusher.give(i);
169 }
170
171 assert_eq!(v, vec![6, 8]);
172 }
173
174 #[test]
175 fn partition() {
176 let mut evens = Vec::new();
177 let mut odds = Vec::new();
178 let mut pusher = Partition::new(
179 |x| x % 2 == 0,
180 ForEach::new(|x| evens.push(x)),
181 ForEach::new(|x| odds.push(x)),
182 );
183
184 for i in 0..5 {
185 pusher.give(i);
186 }
187
188 assert_eq!(evens, vec![0, 2, 4]);
189 assert_eq!(odds, vec![1, 3]);
190 }
191
192 #[test]
193 fn tee() {
194 let mut left = Vec::new();
195 let mut right = Vec::new();
196 let mut pusher = Tee::new(
197 ForEach::new(|x| left.push(x)),
198 ForEach::new(|x| right.push(x)),
199 );
200
201 for i in 0..5 {
202 pusher.give(i);
203 }
204
205 assert_eq!(left, vec![0, 1, 2, 3, 4]);
206 assert_eq!(right, vec![0, 1, 2, 3, 4]);
207 }
208
209 #[test]
210 fn tee_rcs() {
211 let mut left = Vec::new();
212 let mut right = Vec::new();
213 let mut pusher = Map::new(
214 Rc::new,
215 Tee::new(
216 ForEach::new(|x: Rc<i32>| left.push(*x)),
217 ForEach::new(|x: Rc<i32>| right.push(*x)),
218 ),
219 );
220
221 for i in 0..5 {
222 pusher.give(i);
223 }
224
225 assert_eq!(left, vec![0, 1, 2, 3, 4]);
226 assert_eq!(right, vec![0, 1, 2, 3, 4]);
227 }
228
229 #[test]
230 fn pivot() {
231 let a = 0..10;
232 let b = 10..20;
233
234 let mut left = Vec::new();
235 let mut right = Vec::new();
236
237 let pivot = Pivot::new(
238 a.into_iter().chain(b),
239 Partition::new(
240 |x| x % 2 == 0,
241 ForEach::new(|x| left.push(x)),
242 ForEach::new(|x| right.push(x)),
243 ),
244 );
245
246 pivot.run();
247
248 assert_eq!(left, vec![0, 2, 4, 6, 8, 10, 12, 14, 16, 18]);
249 assert_eq!(right, vec![1, 3, 5, 7, 9, 11, 13, 15, 17, 19]);
250 }
251}
252
253#[cfg(test)]
254mod test_builder {
255 use super::*;
256
257 #[test]
258 fn test_builder_constructed() {
259 let pb = InputBuild::<usize>(PhantomData);
260 let pb = filter::FilterBuild::new(pb, |&x| 0 == x % 2);
261 let pb = map::MapBuild::new(pb, |x| x * x);
262
263 let mut output = Vec::new();
264 let mut pusherator = pb.push_to(for_each::ForEach::new(|x| output.push(x)));
265
266 for x in 0..10 {
267 pusherator.give(x);
268 }
269
270 assert_eq!(&[0, 4, 16, 36, 64], &*output);
271 }
272
273 #[test]
274 fn test_builder() {
275 let mut output = Vec::new();
276
277 let mut pusherator = <InputBuild<usize>>::new()
278 .filter(|&x| 0 == x % 2)
279 .map(|x| x * x)
280 .for_each(|x| output.push(x));
281
282 for x in 0..10 {
283 pusherator.give(x);
284 }
285
286 assert_eq!(&[0, 4, 16, 36, 64], &*output);
287 }
288
289 #[test]
290 fn test_builder_tee() {
291 let mut output_evn = Vec::new();
292 let mut output_odd = Vec::new();
293
294 let mut pusherator = <InputBuild<usize>>::new()
295 .tee(
296 <InputBuild<usize>>::new()
297 .filter(|&x| 0 == x % 2)
298 .for_each(|x| output_evn.push(x)),
299 )
300 .filter(|&x| 1 == x % 2)
301 .for_each(|x| output_odd.push(x));
302
303 for x in 0..10 {
304 pusherator.give(x);
305 }
306
307 assert_eq!(&[0, 2, 4, 6, 8], &*output_evn);
308 assert_eq!(&[1, 3, 5, 7, 9], &*output_odd);
309 }
310
311 #[test]
312 fn test_built_subgraph() {
313 let mut output_evn = Vec::new();
314 let mut output_odd = Vec::new();
315
316 let pivot = [1, 2, 3, 4, 5]
317 .into_iter()
318 .chain([3, 4, 5, 6, 7])
319 .map(|x| x * 9)
320 .pull_to_push()
321 .map(|x| if 0 == x % 2 { x / 2 } else { 3 * x + 1 })
322 .tee(
323 <InputBuild<usize>>::new()
324 .filter(|&x| 0 == x % 2)
325 .for_each(|x| output_evn.push(x)),
326 )
327 .filter(|&x| 1 == x % 2)
328 .for_each(|x| output_odd.push(x));
329
330 pivot.run();
331
332 assert_eq!(&[28, 82, 18, 136, 82, 18, 136, 190], &*output_evn);
333 assert_eq!(&[9, 27], &*output_odd);
334 }
335}