sinktools/
send_iter.rs

1//! [`SendIter`] and related items.
2use core::pin::Pin;
3use core::task::{Context, Poll, ready};
4
5use pin_project_lite::pin_project;
6
7use crate::{Sink, SinkBuild};
8
9pin_project! {
10    /// [`Future`] for pulling from an [`Iterator`] and pushing to a [`Sink`].
11    #[must_use = "futures do nothing unless polled"]
12    pub struct SendIter<Pull, Push> {
13        pull: Pull,
14        #[pin]
15        push: Push,
16    }
17}
18impl<Pull, Push> SendIter<Pull, Push>
19where
20    Self: Future,
21{
22    /// Create a new [`SendIter`] from the given `pull` and `push` sides.
23    pub fn new(pull: Pull, push: Push) -> Self {
24        Self { pull, push }
25    }
26}
27impl<Pull, Push> Future for SendIter<Pull, Push>
28where
29    Pull: Iterator,
30    Push: Sink<Pull::Item>,
31{
32    type Output = Result<(), Push::Error>;
33
34    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
35        let mut this = self.project();
36
37        loop {
38            ready!(this.push.as_mut().poll_ready(cx)?);
39            if let Some(item) = this.pull.next() {
40                let () = this.push.as_mut().start_send(item)?;
41            } else {
42                break;
43            }
44        }
45        this.push.as_mut().poll_flush(cx)
46    }
47}
48
49/// [`SinkBuild`] for [`SendIter`]s.
50pub struct SendIterBuild<Iter> {
51    pub(crate) iter: Iter,
52}
53impl<Iter> SinkBuild for SendIterBuild<Iter>
54where
55    Iter: Iterator,
56{
57    type Item = Iter::Item;
58
59    type Output<Next: Sink<Self::Item>> = SendIter<Iter, Next>;
60    fn send_to<Next>(self, next: Next) -> Self::Output<Next>
61    where
62        Next: Sink<Self::Item>,
63    {
64        SendIter::new(self.iter, next)
65    }
66}