1use core::pin::Pin;
3
4use pin_project_lite::pin_project;
5
6use crate::{Sink, SinkBuild, forward_sink};
7
8pin_project! {
9 #[must_use = "sinks do nothing unless polled"]
13 pub struct FilterMap<Si, Func> {
14 #[pin]
15 sink: Si,
16 func: Func,
17 }
18}
19
20impl<Si, Func> FilterMap<Si, Func> {
21 pub fn new<Item>(func: Func, sink: Si) -> Self
23 where
24 Self: Sink<Item>,
25 {
26 Self { sink, func }
27 }
28}
29
30impl<Si, Func, Item, Out> Sink<Item> for FilterMap<Si, Func>
31where
32 Si: Sink<Out>,
33 Func: FnMut(Item) -> Option<Out>,
34{
35 type Error = Si::Error;
36
37 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
38 let this = self.project();
39 if let Some(item) = (this.func)(item) {
40 this.sink.start_send(item)
41 } else {
42 Ok(())
43 }
44 }
45
46 forward_sink!(poll_ready, poll_flush, poll_close);
47}
48
49pub struct FilterMapBuilder<Prev, Func> {
51 pub(crate) prev: Prev,
52 pub(crate) func: Func,
53}
54impl<Prev, ItemOut, Func> SinkBuild for FilterMapBuilder<Prev, Func>
55where
56 Prev: SinkBuild,
57 Func: FnMut(Prev::Item) -> Option<ItemOut>,
58{
59 type Item = ItemOut;
60
61 type Output<Next: Sink<ItemOut>> = Prev::Output<FilterMap<Next, Func>>;
62
63 fn send_to<Next>(self, next: Next) -> Self::Output<Next>
64 where
65 Next: Sink<ItemOut>,
66 {
67 self.prev.send_to(FilterMap::new(self.func, next))
68 }
69}
70
71#[cfg(test)]
72mod tests {
73 use futures_util::stream::StreamExt;
74 use tokio::sync::mpsc::channel;
75 use tokio_stream::wrappers::ReceiverStream;
76 use tokio_util::sync::PollSender;
77
78 use super::*;
79 use crate::sink::SinkExt;
80
81 #[tokio::test]
82 async fn test_filter_map() {
83 let (out_send, out_recv) = channel(1);
84 let out_send = PollSender::new(out_send);
85 let mut out_recv = ReceiverStream::new(out_recv);
86
87 let mut sink = FilterMap::new(core::convert::identity, out_send);
88
89 let a = tokio::task::spawn(async move {
90 sink.send(Some(0)).await.unwrap();
91 sink.send(Some(1)).await.unwrap();
92 sink.send(None).await.unwrap();
93 sink.send(Some(2)).await.unwrap();
94 sink.send(None).await.unwrap();
95 sink.send(None).await.unwrap();
96 sink.send(Some(3)).await.unwrap();
97 sink.send(None).await.unwrap();
98 sink.send(None).await.unwrap();
99 sink.send(None).await.unwrap();
100 sink.send(Some(4)).await.unwrap();
101 sink.send(Some(5)).await.unwrap();
102 sink.send(None).await.unwrap();
103 });
104 println!("{}", line!());
105 assert_eq!(
106 &[0, 1, 2, 3, 4, 5],
107 &*out_recv.by_ref().collect::<Vec<_>>().await
108 );
109 println!("{}", line!());
110 a.await.unwrap();
111 }
112}