1use core::pin::Pin;
3use core::task::{Context, Poll, ready};
4
5use futures_util::Stream;
6use pin_project_lite::pin_project;
7
8use crate::{Sink, SinkBuild};
9
10pin_project! {
11 #[must_use = "futures do nothing unless polled"]
13 pub struct SendStream<Pull, Push> {
14 #[pin]
15 pull: Pull,
16 #[pin]
17 push: Push,
18 }
19}
20impl<Pull, Push> SendStream<Pull, Push>
21where
22 Self: Future,
23{
24 pub fn new(pull: Pull, push: Push) -> Self {
26 Self { pull, push }
27 }
28}
29impl<Pull, Push> Future for SendStream<Pull, Push>
30where
31 Pull: Stream,
32 Push: Sink<Pull::Item>,
33{
34 type Output = Result<(), Push::Error>;
35
36 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
37 let mut this = self.project();
38
39 loop {
40 ready!(this.push.as_mut().poll_ready(cx)?);
41 if let Some(item) = ready!(this.pull.as_mut().poll_next(cx)) {
42 let () = this.push.as_mut().start_send(item)?;
43 } else {
44 break;
45 }
46 }
47 this.push.as_mut().poll_flush(cx)
48 }
49}
50
51pub struct SendStreamBuild<St> {
53 pub(crate) stream: St,
54}
55impl<St> SinkBuild for SendStreamBuild<St>
56where
57 St: Stream,
58{
59 type Item = St::Item;
60
61 type Output<Next: Sink<Self::Item>> = SendStream<St, Next>;
62 fn send_to<Next>(self, next: Next) -> Self::Output<Next>
63 where
64 Next: Sink<Self::Item>,
65 {
66 SendStream::new(self.stream, next)
67 }
68}