sinktools/
filter_map.rs

1//! [`FilterMap`] and related items.
2use core::pin::Pin;
3
4use pin_project_lite::pin_project;
5
6use crate::{Sink, SinkBuild, forward_sink};
7
8pin_project! {
9    /// Same as [`core::iterator::FilterMap`] but as a [`Sink`].
10    ///
11    /// Synchronously filter-maps items and sends the outputs to the following sink.
12    #[must_use = "sinks do nothing unless polled"]
13    pub struct FilterMap<Si, Func> {
14        #[pin]
15        sink: Si,
16        func: Func,
17    }
18}
19
20impl<Si, Func> FilterMap<Si, Func> {
21    /// Creates with mapping `func` and next `sink`.
22    pub fn new<Item>(func: Func, sink: Si) -> Self
23    where
24        Self: Sink<Item>,
25    {
26        Self { sink, func }
27    }
28}
29
30impl<Si, Func, Item, Out> Sink<Item> for FilterMap<Si, Func>
31where
32    Si: Sink<Out>,
33    Func: FnMut(Item) -> Option<Out>,
34{
35    type Error = Si::Error;
36
37    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
38        let this = self.project();
39        if let Some(item) = (this.func)(item) {
40            this.sink.start_send(item)
41        } else {
42            Ok(())
43        }
44    }
45
46    forward_sink!(poll_ready, poll_flush, poll_close);
47}
48
49/// [`SinkBuild`] for [`FilterMap`].
50pub struct FilterMapBuilder<Prev, Func> {
51    pub(crate) prev: Prev,
52    pub(crate) func: Func,
53}
54impl<Prev, ItemOut, Func> SinkBuild for FilterMapBuilder<Prev, Func>
55where
56    Prev: SinkBuild,
57    Func: FnMut(Prev::Item) -> Option<ItemOut>,
58{
59    type Item = ItemOut;
60
61    type Output<Next: Sink<ItemOut>> = Prev::Output<FilterMap<Next, Func>>;
62
63    fn send_to<Next>(self, next: Next) -> Self::Output<Next>
64    where
65        Next: Sink<ItemOut>,
66    {
67        self.prev.send_to(FilterMap::new(self.func, next))
68    }
69}
70
71#[cfg(test)]
72mod tests {
73    use futures_util::stream::StreamExt;
74    use tokio::sync::mpsc::channel;
75    use tokio_stream::wrappers::ReceiverStream;
76    use tokio_util::sync::PollSender;
77
78    use super::*;
79    use crate::sink::SinkExt;
80
81    #[tokio::test]
82    async fn test_filter_map() {
83        let (out_send, out_recv) = channel(1);
84        let out_send = PollSender::new(out_send);
85        let mut out_recv = ReceiverStream::new(out_recv);
86
87        let mut sink = FilterMap::new(core::convert::identity, out_send);
88
89        let a = tokio::task::spawn(async move {
90            sink.send(Some(0)).await.unwrap();
91            sink.send(Some(1)).await.unwrap();
92            sink.send(None).await.unwrap();
93            sink.send(Some(2)).await.unwrap();
94            sink.send(None).await.unwrap();
95            sink.send(None).await.unwrap();
96            sink.send(Some(3)).await.unwrap();
97            sink.send(None).await.unwrap();
98            sink.send(None).await.unwrap();
99            sink.send(None).await.unwrap();
100            sink.send(Some(4)).await.unwrap();
101            sink.send(Some(5)).await.unwrap();
102            sink.send(None).await.unwrap();
103        });
104        println!("{}", line!());
105        assert_eq!(
106            &[0, 1, 2, 3, 4, 5],
107            &*out_recv.by_ref().collect::<Vec<_>>().await
108        );
109        println!("{}", line!());
110        a.await.unwrap();
111    }
112}