sinktools/
flat_map.rs

1//! [`FlatMap`] and related items.
2use core::pin::Pin;
3use core::task::{Context, Poll, ready};
4
5use pin_project_lite::pin_project;
6
7use crate::{Sink, SinkBuild};
8
9pin_project! {
10    /// Same as [`core::iterator::FlatMap`] but as a [`Sink`].
11    ///
12    /// Synchronously maps and flattens items, and sends the outputs to the following sink.
13    #[must_use = "sinks do nothing unless polled"]
14    pub struct FlatMap<Si, Func, IntoIter>
15    where
16        IntoIter: IntoIterator,
17    {
18        #[pin]
19        sink: Si,
20        func: Func,
21        // Current iterator and the next item.
22        iter_next: Option<(IntoIter::IntoIter, IntoIter::Item)>,
23    }
24}
25
26impl<Si, Func, IntoIter> FlatMap<Si, Func, IntoIter>
27where
28    IntoIter: IntoIterator,
29{
30    /// Create with flat-mapping function `func` and next `sink`.
31    pub fn new<Item>(func: Func, sink: Si) -> Self
32    where
33        Self: Sink<Item>,
34    {
35        Self {
36            sink,
37            func,
38            iter_next: None,
39        }
40    }
41
42    fn poll_ready_impl(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Si::Error>>
43    where
44        Si: Sink<IntoIter::Item>,
45    {
46        let mut this = self.project();
47
48        while this.iter_next.is_some() {
49            // Ensure following sink is ready.
50            ready!(this.sink.as_mut().poll_ready(cx))?;
51
52            // Send the item.
53            let (mut iter, next) = this.iter_next.take().unwrap();
54            this.sink.as_mut().start_send(next)?;
55
56            // Replace the iterator and next item (if any).
57            *this.iter_next = iter.next().map(|next| (iter, next));
58        }
59
60        Poll::Ready(Ok(()))
61    }
62}
63
64impl<Si, Func, Item, IntoIter> Sink<Item> for FlatMap<Si, Func, IntoIter>
65where
66    Si: Sink<IntoIter::Item>,
67    Func: FnMut(Item) -> IntoIter,
68    IntoIter: IntoIterator,
69{
70    type Error = Si::Error;
71
72    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
73        self.poll_ready_impl(cx)
74    }
75
76    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
77        let this = self.project();
78
79        assert!(
80            this.iter_next.is_none(),
81            "Sink not ready: `poll_ready` must be called and return `Ready` before `start_send` is called."
82        );
83        let mut iter = (this.func)(item).into_iter();
84        *this.iter_next = iter.next().map(|next| (iter, next));
85        Ok(())
86    }
87
88    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
89        ready!(self.as_mut().poll_ready_impl(cx)?);
90        self.project().sink.poll_flush(cx)
91    }
92
93    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
94        ready!(self.as_mut().poll_ready_impl(cx)?);
95        self.project().sink.poll_close(cx)
96    }
97}
98
99/// [`SinkBuild`] for [`FlatMap`].
100pub struct FlatMapBuilder<Prev, Func> {
101    pub(crate) prev: Prev,
102    pub(crate) func: Func,
103}
104impl<Prev, Func, IntoIter> SinkBuild for FlatMapBuilder<Prev, Func>
105where
106    Prev: SinkBuild,
107    Func: FnMut(Prev::Item) -> IntoIter,
108    IntoIter: IntoIterator,
109{
110    type Item = IntoIter::Item;
111
112    type Output<Next: Sink<IntoIter::Item>> = Prev::Output<FlatMap<Next, Func, IntoIter>>;
113
114    fn send_to<Next>(self, next: Next) -> Self::Output<Next>
115    where
116        Next: Sink<IntoIter::Item>,
117    {
118        self.prev.send_to(FlatMap::new(self.func, next))
119    }
120}