pusherator/
lib.rs

1//! Pusherator generics and argument order conventions:
2//! - `Next` (being the next owned pusherator) should come first in generic
3//!   arguments.
4//! - However `next: Next` in `new(...)` arguments should come last. This is so
5//!   the rest of the arguments appear in the order data flows in.
6//! - Any closures `Func` should come before their arguments, so:
7//!   `<Func: Fn(A) -> B, A, B>`
8
9#[cfg(feature = "demux")]
10pub mod demux;
11pub mod filter;
12pub mod filter_map;
13pub mod flatten;
14pub mod for_each;
15pub mod inspect;
16pub mod map;
17pub mod null;
18pub mod partition;
19pub mod pivot;
20pub mod switch;
21pub mod tee;
22pub mod unzip;
23
24use std::marker::PhantomData;
25
26use either::Either;
27
28pub trait Pusherator: Sized {
29    type Item;
30    fn give(&mut self, item: Self::Item);
31}
32
33pub trait IteratorToPusherator: Iterator {
34    fn pull_to_push(self) -> pivot::PivotBuild<Self>
35    where
36        Self: Sized,
37    {
38        pivot::PivotBuild::new(self)
39    }
40}
41impl<I> IteratorToPusherator for I where I: Sized + Iterator {}
42
43pub trait PusheratorBuild {
44    type ItemOut;
45
46    type Output<Next: Pusherator<Item = Self::ItemOut>>;
47    fn push_to<Next>(self, input: Next) -> Self::Output<Next>
48    where
49        Next: Pusherator<Item = Self::ItemOut>;
50
51    fn map<Func, Out>(self, func: Func) -> map::MapBuild<Self, Func>
52    where
53        Self: Sized,
54        Func: FnMut(Self::ItemOut) -> Out,
55    {
56        map::MapBuild::new(self, func)
57    }
58
59    fn inspect<Func>(self, func: Func) -> inspect::InspectBuild<Self, Func>
60    where
61        Self: Sized,
62        Func: FnMut(&Self::ItemOut),
63    {
64        inspect::InspectBuild::new(self, func)
65    }
66
67    fn filter<Func>(self, func: Func) -> filter::FilterBuild<Self, Func>
68    where
69        Self: Sized,
70        Func: FnMut(&Self::ItemOut) -> bool,
71    {
72        filter::FilterBuild::new(self, func)
73    }
74
75    fn tee<Next1>(self, next1: Next1) -> tee::TeeBuild<Self, Next1>
76    where
77        Self: Sized,
78        Self::ItemOut: Clone,
79        Next1: Pusherator<Item = Self::ItemOut>,
80    {
81        tee::TeeBuild::new(self, next1)
82    }
83
84    fn unzip<Next1, Item2>(self, next1: Next1) -> unzip::UnzipBuild<Self, Next1>
85    where
86        Self: Sized,
87        Self: PusheratorBuild<ItemOut = (Next1::Item, Item2)>,
88        Next1: Pusherator,
89    {
90        unzip::UnzipBuild::new(self, next1)
91    }
92
93    fn switch<Next1, Item2>(self, next1: Next1) -> switch::SwitchBuild<Self, Next1>
94    where
95        Self: Sized,
96        Self: PusheratorBuild<ItemOut = Either<Next1::Item, Item2>>,
97        Next1: Pusherator,
98    {
99        switch::SwitchBuild::new(self, next1)
100    }
101
102    fn for_each<Func>(self, func: Func) -> Self::Output<for_each::ForEach<Func, Self::ItemOut>>
103    where
104        Self: Sized,
105        Func: FnMut(Self::ItemOut),
106    {
107        self.push_to(for_each::ForEach::new(func))
108    }
109
110    #[cfg(feature = "demux")]
111    fn demux<Func, Nexts>(
112        self,
113        func: Func,
114        nexts: Nexts,
115    ) -> Self::Output<demux::Demux<Func, Nexts, Self::ItemOut>>
116    where
117        Self: Sized,
118        Func: FnMut(Self::ItemOut, &mut Nexts),
119    {
120        self.push_to(demux::Demux::new(func, nexts))
121    }
122}
123
124pub struct InputBuild<T>(PhantomData<T>);
125impl<T> Default for InputBuild<T> {
126    fn default() -> Self {
127        Self(PhantomData)
128    }
129}
130impl<T> InputBuild<T> {
131    pub fn new() -> Self {
132        Default::default()
133    }
134}
135impl<T> PusheratorBuild for InputBuild<T> {
136    type ItemOut = T;
137
138    type Output<O: Pusherator<Item = Self::ItemOut>> = O;
139    fn push_to<O>(self, input: O) -> Self::Output<O>
140    where
141        O: Pusherator<Item = Self::ItemOut>,
142    {
143        input
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use std::rc::Rc;
150
151    use super::Pusherator;
152    use super::filter::Filter;
153    use super::for_each::ForEach;
154    use super::map::Map;
155    use super::partition::Partition;
156    use super::pivot::Pivot;
157    use super::tee::Tee;
158
159    #[test]
160    fn linear_chains() {
161        let mut v = Vec::new();
162        let mut pusher = Map::new(
163            |x| x * 2,
164            Filter::new(|x| *x > 5, ForEach::new(|x| v.push(x))),
165        );
166
167        for i in 0..5 {
168            pusher.give(i);
169        }
170
171        assert_eq!(v, vec![6, 8]);
172    }
173
174    #[test]
175    fn partition() {
176        let mut evens = Vec::new();
177        let mut odds = Vec::new();
178        let mut pusher = Partition::new(
179            |x| x % 2 == 0,
180            ForEach::new(|x| evens.push(x)),
181            ForEach::new(|x| odds.push(x)),
182        );
183
184        for i in 0..5 {
185            pusher.give(i);
186        }
187
188        assert_eq!(evens, vec![0, 2, 4]);
189        assert_eq!(odds, vec![1, 3]);
190    }
191
192    #[test]
193    fn tee() {
194        let mut left = Vec::new();
195        let mut right = Vec::new();
196        let mut pusher = Tee::new(
197            ForEach::new(|x| left.push(x)),
198            ForEach::new(|x| right.push(x)),
199        );
200
201        for i in 0..5 {
202            pusher.give(i);
203        }
204
205        assert_eq!(left, vec![0, 1, 2, 3, 4]);
206        assert_eq!(right, vec![0, 1, 2, 3, 4]);
207    }
208
209    #[test]
210    fn tee_rcs() {
211        let mut left = Vec::new();
212        let mut right = Vec::new();
213        let mut pusher = Map::new(
214            Rc::new,
215            Tee::new(
216                ForEach::new(|x: Rc<i32>| left.push(*x)),
217                ForEach::new(|x: Rc<i32>| right.push(*x)),
218            ),
219        );
220
221        for i in 0..5 {
222            pusher.give(i);
223        }
224
225        assert_eq!(left, vec![0, 1, 2, 3, 4]);
226        assert_eq!(right, vec![0, 1, 2, 3, 4]);
227    }
228
229    #[test]
230    fn pivot() {
231        let a = 0..10;
232        let b = 10..20;
233
234        let mut left = Vec::new();
235        let mut right = Vec::new();
236
237        let pivot = Pivot::new(
238            a.into_iter().chain(b),
239            Partition::new(
240                |x| x % 2 == 0,
241                ForEach::new(|x| left.push(x)),
242                ForEach::new(|x| right.push(x)),
243            ),
244        );
245
246        pivot.run();
247
248        assert_eq!(left, vec![0, 2, 4, 6, 8, 10, 12, 14, 16, 18]);
249        assert_eq!(right, vec![1, 3, 5, 7, 9, 11, 13, 15, 17, 19]);
250    }
251}
252
253#[cfg(test)]
254mod test_builder {
255    use super::*;
256
257    #[test]
258    fn test_builder_constructed() {
259        let pb = InputBuild::<usize>(PhantomData);
260        let pb = filter::FilterBuild::new(pb, |&x| 0 == x % 2);
261        let pb = map::MapBuild::new(pb, |x| x * x);
262
263        let mut output = Vec::new();
264        let mut pusherator = pb.push_to(for_each::ForEach::new(|x| output.push(x)));
265
266        for x in 0..10 {
267            pusherator.give(x);
268        }
269
270        assert_eq!(&[0, 4, 16, 36, 64], &*output);
271    }
272
273    #[test]
274    fn test_builder() {
275        let mut output = Vec::new();
276
277        let mut pusherator = <InputBuild<usize>>::new()
278            .filter(|&x| 0 == x % 2)
279            .map(|x| x * x)
280            .for_each(|x| output.push(x));
281
282        for x in 0..10 {
283            pusherator.give(x);
284        }
285
286        assert_eq!(&[0, 4, 16, 36, 64], &*output);
287    }
288
289    #[test]
290    fn test_builder_tee() {
291        let mut output_evn = Vec::new();
292        let mut output_odd = Vec::new();
293
294        let mut pusherator = <InputBuild<usize>>::new()
295            .tee(
296                <InputBuild<usize>>::new()
297                    .filter(|&x| 0 == x % 2)
298                    .for_each(|x| output_evn.push(x)),
299            )
300            .filter(|&x| 1 == x % 2)
301            .for_each(|x| output_odd.push(x));
302
303        for x in 0..10 {
304            pusherator.give(x);
305        }
306
307        assert_eq!(&[0, 2, 4, 6, 8], &*output_evn);
308        assert_eq!(&[1, 3, 5, 7, 9], &*output_odd);
309    }
310
311    #[test]
312    fn test_built_subgraph() {
313        let mut output_evn = Vec::new();
314        let mut output_odd = Vec::new();
315
316        let pivot = [1, 2, 3, 4, 5]
317            .into_iter()
318            .chain([3, 4, 5, 6, 7])
319            .map(|x| x * 9)
320            .pull_to_push()
321            .map(|x| if 0 == x % 2 { x / 2 } else { 3 * x + 1 })
322            .tee(
323                <InputBuild<usize>>::new()
324                    .filter(|&x| 0 == x % 2)
325                    .for_each(|x| output_evn.push(x)),
326            )
327            .filter(|&x| 1 == x % 2)
328            .for_each(|x| output_odd.push(x));
329
330        pivot.run();
331
332        assert_eq!(&[28, 82, 18, 136, 82, 18, 136, 190], &*output_evn);
333        assert_eq!(&[9, 27], &*output_odd);
334    }
335}