1use core::pin::Pin;
3use core::task::{Context, Poll};
4
5use pin_project_lite::pin_project;
6
7use crate::{Sink, ready_both};
8
9pin_project! {
10 #[must_use = "sinks do nothing unless polled"]
14 pub struct Unzip<Si0, Si1> {
15 #[pin]
16 sink_0: Si0,
17 #[pin]
18 sink_1: Si1,
19 }
20}
21
22impl<Si0, Si1> Unzip<Si0, Si1> {
23 pub fn new<Item>(sink_0: Si0, sink_1: Si1) -> Self
25 where
26 Self: Sink<Item>,
27 {
28 Self { sink_0, sink_1 }
29 }
30}
31
32impl<Si0, Si1, Item0, Item1> Sink<(Item0, Item1)> for Unzip<Si0, Si1>
33where
34 Si0: Sink<Item0>,
35 Si1: Sink<Item1>,
36 Si0::Error: From<Si1::Error>,
37{
38 type Error = Si0::Error;
39
40 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
41 let this = self.project();
42 ready_both!(this.sink_0.poll_ready(cx)?, this.sink_1.poll_ready(cx)?,);
43 Poll::Ready(Ok(()))
44 }
45 fn start_send(self: Pin<&mut Self>, item: (Item0, Item1)) -> Result<(), Self::Error> {
46 let this = self.project();
47 this.sink_0.start_send(item.0)?;
48 this.sink_1.start_send(item.1)?;
49 Ok(())
50 }
51 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
52 let this = self.project();
53 ready_both!(this.sink_0.poll_flush(cx)?, this.sink_1.poll_flush(cx)?,);
54 Poll::Ready(Ok(()))
55 }
56 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
57 let this = self.project();
58 ready_both!(this.sink_0.poll_close(cx)?, this.sink_1.poll_close(cx)?,);
59 Poll::Ready(Ok(()))
60 }
61}