1use core::pin::Pin;
3
4use pin_project_lite::pin_project;
5
6use crate::{Sink, SinkBuild, forward_sink};
7
8pin_project! {
9 #[must_use = "sinks do nothing unless polled"]
13 pub struct Filter<Si, Func> {
14 #[pin]
15 sink: Si,
16 func: Func,
17 }
18}
19
20impl<Si, Func> Filter<Si, Func> {
21 pub fn new<Item>(func: Func, sink: Si) -> Self
23 where
24 Self: Sink<Item>,
25 {
26 Self { sink, func }
27 }
28}
29
30impl<Si, Func, Item> Sink<Item> for Filter<Si, Func>
31where
32 Si: Sink<Item>,
33 Func: FnMut(&Item) -> bool,
34{
35 type Error = Si::Error;
36
37 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
38 let this = self.project();
39 if (this.func)(&item) {
40 this.sink.start_send(item)
41 } else {
42 Ok(())
43 }
44 }
45
46 forward_sink!(poll_ready, poll_flush, poll_close);
47}
48
49pub struct FilterBuilder<Prev, Func> {
51 pub(crate) prev: Prev,
52 pub(crate) func: Func,
53}
54impl<Prev, Func> SinkBuild for FilterBuilder<Prev, Func>
55where
56 Prev: SinkBuild,
57 Func: FnMut(&Prev::Item) -> bool,
58{
59 type Item = Prev::Item;
60
61 type Output<Next: Sink<Prev::Item>> = Prev::Output<Filter<Next, Func>>;
62
63 fn send_to<Next>(self, next: Next) -> Self::Output<Next>
64 where
65 Next: Sink<Prev::Item>,
66 {
67 self.prev.send_to(Filter::new(self.func, next))
68 }
69}