1use core::pin::Pin;
3use core::task::{Context, Poll, ready};
4
5use pin_project_lite::pin_project;
6
7use crate::{Sink, SinkBuild};
8
9pin_project! {
10 #[must_use = "sinks do nothing unless polled"]
14 pub struct FlatMap<Si, Func, IntoIter>
15 where
16 IntoIter: IntoIterator,
17 {
18 #[pin]
19 sink: Si,
20 func: Func,
21 iter_next: Option<(IntoIter::IntoIter, IntoIter::Item)>,
23 }
24}
25
26impl<Si, Func, IntoIter> FlatMap<Si, Func, IntoIter>
27where
28 IntoIter: IntoIterator,
29{
30 pub fn new<Item>(func: Func, sink: Si) -> Self
32 where
33 Self: Sink<Item>,
34 {
35 Self {
36 sink,
37 func,
38 iter_next: None,
39 }
40 }
41
42 fn poll_ready_impl(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Si::Error>>
43 where
44 Si: Sink<IntoIter::Item>,
45 {
46 let mut this = self.project();
47
48 while this.iter_next.is_some() {
49 ready!(this.sink.as_mut().poll_ready(cx))?;
51
52 let (mut iter, next) = this.iter_next.take().unwrap();
54 this.sink.as_mut().start_send(next)?;
55
56 *this.iter_next = iter.next().map(|next| (iter, next));
58 }
59
60 Poll::Ready(Ok(()))
61 }
62}
63
64impl<Si, Func, Item, IntoIter> Sink<Item> for FlatMap<Si, Func, IntoIter>
65where
66 Si: Sink<IntoIter::Item>,
67 Func: FnMut(Item) -> IntoIter,
68 IntoIter: IntoIterator,
69{
70 type Error = Si::Error;
71
72 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
73 self.poll_ready_impl(cx)
74 }
75
76 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
77 let this = self.project();
78
79 assert!(
80 this.iter_next.is_none(),
81 "Sink not ready: `poll_ready` must be called and return `Ready` before `start_send` is called."
82 );
83 let mut iter = (this.func)(item).into_iter();
84 *this.iter_next = iter.next().map(|next| (iter, next));
85 Ok(())
86 }
87
88 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
89 ready!(self.as_mut().poll_ready_impl(cx)?);
90 self.project().sink.poll_flush(cx)
91 }
92
93 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
94 ready!(self.as_mut().poll_ready_impl(cx)?);
95 self.project().sink.poll_close(cx)
96 }
97}
98
99pub struct FlatMapBuilder<Prev, Func> {
101 pub(crate) prev: Prev,
102 pub(crate) func: Func,
103}
104impl<Prev, Func, IntoIter> SinkBuild for FlatMapBuilder<Prev, Func>
105where
106 Prev: SinkBuild,
107 Func: FnMut(Prev::Item) -> IntoIter,
108 IntoIter: IntoIterator,
109{
110 type Item = IntoIter::Item;
111
112 type Output<Next: Sink<IntoIter::Item>> = Prev::Output<FlatMap<Next, Func, IntoIter>>;
113
114 fn send_to<Next>(self, next: Next) -> Self::Output<Next>
115 where
116 Next: Sink<IntoIter::Item>,
117 {
118 self.prev.send_to(FlatMap::new(self.func, next))
119 }
120}