sinktools/
send_stream.rs

1//! [`SendStream`] and related items.
2use core::pin::Pin;
3use core::task::{Context, Poll, ready};
4
5use futures_util::Stream;
6use pin_project_lite::pin_project;
7
8use crate::{Sink, SinkBuild};
9
10pin_project! {
11    /// [`Future`] for pulling from an [`Iterator`] and pushing to a [`Sink`].
12    #[must_use = "futures do nothing unless polled"]
13    pub struct SendStream<Pull, Push> {
14        #[pin]
15        pull: Pull,
16        #[pin]
17        push: Push,
18    }
19}
20impl<Pull, Push> SendStream<Pull, Push>
21where
22    Self: Future,
23{
24    /// Create a new [`SendStream`] from the given `pull` and `push` sides.
25    pub fn new(pull: Pull, push: Push) -> Self {
26        Self { pull, push }
27    }
28}
29impl<Pull, Push> Future for SendStream<Pull, Push>
30where
31    Pull: Stream,
32    Push: Sink<Pull::Item>,
33{
34    type Output = Result<(), Push::Error>;
35
36    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
37        let mut this = self.project();
38
39        loop {
40            ready!(this.push.as_mut().poll_ready(cx)?);
41            if let Some(item) = ready!(this.pull.as_mut().poll_next(cx)) {
42                let () = this.push.as_mut().start_send(item)?;
43            } else {
44                break;
45            }
46        }
47        this.push.as_mut().poll_flush(cx)
48    }
49}
50
51/// [`SinkBuild`] for [`SendStream`].
52pub struct SendStreamBuild<St> {
53    pub(crate) stream: St,
54}
55impl<St> SinkBuild for SendStreamBuild<St>
56where
57    St: Stream,
58{
59    type Item = St::Item;
60
61    type Output<Next: Sink<Self::Item>> = SendStream<St, Next>;
62    fn send_to<Next>(self, next: Next) -> Self::Output<Next>
63    where
64        Next: Sink<Self::Item>,
65    {
66        SendStream::new(self.stream, next)
67    }
68}