sinktools/
unzip.rs

1//! [`Unzip`].
2use core::pin::Pin;
3use core::task::{Context, Poll};
4
5use pin_project_lite::pin_project;
6
7use crate::{Sink, ready_both};
8
9pin_project! {
10    /// Same as [`core::iterator::Unzip`] but as a [`Sink`].
11    ///
12    /// Synchronously maps items and sends the output to the following sink.
13    #[must_use = "sinks do nothing unless polled"]
14    pub struct Unzip<Si0, Si1> {
15        #[pin]
16        sink_0: Si0,
17        #[pin]
18        sink_1: Si1,
19    }
20}
21
22impl<Si0, Si1> Unzip<Si0, Si1> {
23    /// Creates with next sinks `sink_0` and `sink_1`.
24    pub fn new<Item>(sink_0: Si0, sink_1: Si1) -> Self
25    where
26        Self: Sink<Item>,
27    {
28        Self { sink_0, sink_1 }
29    }
30}
31
32impl<Si0, Si1, Item0, Item1> Sink<(Item0, Item1)> for Unzip<Si0, Si1>
33where
34    Si0: Sink<Item0>,
35    Si1: Sink<Item1>,
36    Si0::Error: From<Si1::Error>,
37{
38    type Error = Si0::Error;
39
40    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
41        let this = self.project();
42        ready_both!(this.sink_0.poll_ready(cx)?, this.sink_1.poll_ready(cx)?,);
43        Poll::Ready(Ok(()))
44    }
45    fn start_send(self: Pin<&mut Self>, item: (Item0, Item1)) -> Result<(), Self::Error> {
46        let this = self.project();
47        this.sink_0.start_send(item.0)?;
48        this.sink_1.start_send(item.1)?;
49        Ok(())
50    }
51    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
52        let this = self.project();
53        ready_both!(this.sink_0.poll_flush(cx)?, this.sink_1.poll_flush(cx)?,);
54        Poll::Ready(Ok(()))
55    }
56    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
57        let this = self.project();
58        ready_both!(this.sink_0.poll_close(cx)?, this.sink_1.poll_close(cx)?,);
59        Poll::Ready(Ok(()))
60    }
61}