1use core::future::Future;
4use core::marker::PhantomData;
5use core::pin::Pin;
6use core::task::{Context, Poll, ready};
7
8use futures_util::{FutureExt, Sink, Stream};
9use pin_project_lite::pin_project;
10
11pin_project! {
12 #[project = LazySinkProj]
13 enum LazySinkState<Func, Fut, Si, Item> {
14 Uninit {
15 func: Option<Func>,
17 },
18 Thunkulating {
19 #[pin]
21 future: Fut,
22 item: Option<Item>,
24 },
25 Done {
26 #[pin]
28 sink: Si,
29 buf: Option<Item>,
31 },
32 }
33}
34
35pin_project! {
36 pub struct LazySink<Func, Fut, Si, Item> {
38 #[pin]
39 state: LazySinkState<Func, Fut, Si, Item>,
40 }
41}
42
43impl<Func, Fut, Si, Item, Error> LazySink<Func, Fut, Si, Item>
44where
45 Func: FnOnce() -> Fut,
46 Fut: Future<Output = Result<Si, Error>>,
47 Si: Sink<Item>,
48 Error: From<Si::Error>,
49{
50 pub fn new(func: Func) -> Self {
52 Self {
53 state: LazySinkState::Uninit { func: Some(func) },
54 }
55 }
56
57 fn poll_sink_op(
58 self: Pin<&mut Self>,
59 cx: &mut Context<'_>,
60 sink_op: impl FnOnce(Pin<&mut Si>, &mut Context<'_>) -> Poll<Result<(), Si::Error>>,
61 ) -> Poll<Result<(), Error>> {
62 let mut this = self.project();
63
64 if let LazySinkProj::Uninit { func: _ } = this.state.as_mut().project() {
65 return Poll::Ready(Ok(())); }
67
68 if let LazySinkProj::Thunkulating { mut future, item } = this.state.as_mut().project() {
69 let sink = ready!(future.poll_unpin(cx))?;
70 let buf = Some(item.take().unwrap());
71 this.state.as_mut().set(LazySinkState::Done { sink, buf });
72 }
73
74 if let LazySinkProj::Done { mut sink, buf } = this.state.as_mut().project() {
75 if buf.is_some() {
76 let () = ready!(sink.as_mut().poll_ready(cx)?);
77 let () = sink.as_mut().start_send(buf.take().unwrap())?;
78 }
79 return (sink_op)(sink, cx).map_err(From::from);
80 }
81
82 panic!("`LazySink` in invalid state.");
83 }
84}
85
86impl<Func, Fut, Si, Item, Error> Sink<Item> for LazySink<Func, Fut, Si, Item>
87where
88 Func: FnOnce() -> Fut,
89 Fut: Future<Output = Result<Si, Error>>,
90 Si: Sink<Item>,
91 Error: From<Si::Error>,
92{
93 type Error = Error;
94
95 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
96 self.poll_sink_op(cx, Sink::poll_ready)
97 }
98
99 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
100 let mut this = self.project();
101
102 if let LazySinkProj::Uninit { func } = this.state.as_mut().project() {
103 let func = func.take().unwrap();
104 let future = (func)();
105 let item = Some(item);
106 this.state
107 .as_mut()
108 .set(LazySinkState::Thunkulating { future, item });
109 Ok(())
110 } else if let LazySinkProj::Done { sink, buf: _buf } = this.state.project() {
111 debug_assert!(_buf.is_none());
112 sink.start_send(item).map_err(From::from)
113 } else {
114 panic!("`LazySink` not ready.");
115 }
116 }
117
118 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
119 self.poll_sink_op(cx, Sink::poll_flush)
120 }
121
122 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
123 self.poll_sink_op(cx, Sink::poll_close)
124 }
125}
126
127pin_project! {
128 #[project = LazySourceProj]
129 enum LazySourceState<St, Fut, Func> {
130 Uninit {
131 func: Option<Func>,
133 },
134 Thunkulating {
135 #[pin]
137 fut: Fut
138 },
139 Done {
140 #[pin]
141 stream: St,
142 },
143 }
144}
145
146pin_project! {
147 pub struct LazySource<ThunkFunc, StreamType, PreparingFutureType, StreamItemType> {
149 #[pin]
150 state: LazySourceState<StreamType, PreparingFutureType, ThunkFunc>,
151 _phantom: PhantomData<StreamItemType>,
152 }
153}
154
155impl<F, S, Fut, E, T> LazySource<F, S, Fut, T>
156where
157 F: FnOnce() -> Fut,
158 Fut: Future<Output = Result<S, E>>,
159 S: Stream<Item = T>,
160{
161 pub fn new(thunk: F) -> Self {
163 Self {
164 state: LazySourceState::Uninit { func: Some(thunk) },
165 _phantom: Default::default(),
166 }
167 }
168}
169
170impl<F, S, Fut, E, T> Stream for LazySource<F, S, Fut, T>
171where
172 F: FnOnce() -> Fut,
173 Fut: Future<Output = Result<S, E>>,
174 S: Stream<Item = T>,
175{
176 type Item = S::Item;
177
178 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
179 if let LazySourceProj::Uninit { func } = self.as_mut().project().state.project() {
180 let func = func.take().unwrap();
181 self.as_mut()
182 .project()
183 .state
184 .set(LazySourceState::Thunkulating { fut: func() });
185 }
186
187 if let LazySourceProj::Thunkulating { fut } = self.as_mut().project().state.project() {
188 match ready!(fut.poll(cx)) {
189 Ok(stream) => {
190 self.as_mut()
191 .project()
192 .state
193 .set(LazySourceState::Done { stream });
194 }
195 Err(_e) => {
196 return Poll::Ready(None);
198 }
199 }
200 }
201
202 if let LazySourceProj::Done { stream } = self.as_mut().project().state.project() {
203 return stream.poll_next(cx);
204 }
205
206 panic!("`LazySource` in invalid state.");
207 }
208}
209
210#[cfg(test)]
211mod test {
212 use core::cell::RefCell;
213 use core::convert::Infallible;
214 use core::pin::pin;
215 use core::task::Context;
216
217 use futures_util::{Sink, SinkExt, StreamExt};
218
219 use super::*;
220 use crate::for_each::ForEach;
221
222 #[tokio::test]
223 async fn test_lazy_sink() {
224 let test_data = b"test";
225 let output = RefCell::new(Vec::new());
226
227 let mut lazy_sink = LazySink::new(|| {
228 Box::pin(async {
229 Result::<_, Infallible>::Ok(ForEach::new(|item| {
230 output.borrow_mut().extend_from_slice(item);
231 }))
232 })
233 });
234
235 SinkExt::send(&mut lazy_sink, test_data.as_slice())
236 .await
237 .unwrap();
238
239 SinkExt::send(&mut lazy_sink, test_data.as_slice())
240 .await
241 .unwrap();
242
243 SinkExt::flush(&mut lazy_sink).await.unwrap();
244
245 SinkExt::close(&mut lazy_sink).await.unwrap();
246
247 assert_eq!(&output.borrow().as_slice()[0..test_data.len()], test_data);
248 assert_eq!(&output.borrow().as_slice()[test_data.len()..], test_data);
249 }
250
251 #[test]
252 fn test_lazy_sink_fut_err() {
253 enum DummySink {}
254 impl Sink<()> for DummySink {
255 type Error = &'static str;
256 fn poll_ready(
257 self: Pin<&mut Self>,
258 _cx: &mut Context<'_>,
259 ) -> Poll<Result<(), Self::Error>> {
260 panic!()
261 }
262 fn start_send(self: Pin<&mut Self>, _item: ()) -> Result<(), Self::Error> {
263 panic!()
264 }
265 fn poll_flush(
266 self: Pin<&mut Self>,
267 _cx: &mut Context<'_>,
268 ) -> Poll<Result<(), Self::Error>> {
269 panic!()
270 }
271 fn poll_close(
272 self: Pin<&mut Self>,
273 _cx: &mut Context<'_>,
274 ) -> Poll<Result<(), Self::Error>> {
275 panic!()
276 }
277 }
278
279 let mut lazy_sink = pin!(LazySink::new(|| async {
280 Result::<DummySink, _>::Err("Fail!")
281 }));
282
283 let cx = &mut Context::from_waker(futures_task::noop_waker_ref());
284
285 assert_eq!(Poll::Ready(Ok(())), lazy_sink.as_mut().poll_ready(cx));
286 assert_eq!(Poll::Ready(Ok(())), lazy_sink.as_mut().poll_flush(cx));
287 assert_eq!(Poll::Ready(Ok(())), lazy_sink.as_mut().poll_close(cx));
288 assert_eq!(Poll::Ready(Ok(())), lazy_sink.as_mut().poll_ready(cx));
289 assert_eq!(Ok(()), lazy_sink.as_mut().start_send(())); assert_eq!(Poll::Ready(Err("Fail!")), lazy_sink.as_mut().poll_flush(cx)); }
292
293 #[test]
294 fn test_lazy_sink_sink_err() {
295 let mut lazy_sink = pin!(LazySink::new(|| async {
296 Ok(futures_util::sink::unfold((), |(), _item| async {
297 Err("Fail!")
298 }))
299 }));
300
301 let cx = &mut Context::from_waker(futures_task::noop_waker_ref());
302
303 assert_eq!(Poll::Ready(Ok(())), lazy_sink.as_mut().poll_ready(cx));
304 assert_eq!(Poll::Ready(Ok(())), lazy_sink.as_mut().poll_flush(cx));
305 assert_eq!(Poll::Ready(Ok(())), lazy_sink.as_mut().poll_close(cx));
306 assert_eq!(Poll::Ready(Ok(())), lazy_sink.as_mut().poll_ready(cx));
307 assert_eq!(Ok(()), lazy_sink.as_mut().start_send(())); assert_eq!(Poll::Ready(Err("Fail!")), lazy_sink.as_mut().poll_flush(cx)); }
310
311 #[test]
312 fn test_lazy_sink_good() {
313 let test_data = b"test";
314
315 let mut lazy_sink = pin!(LazySink::new(|| async {
316 Result::<_, ()>::Ok(futures_util::sink::unfold((), |(), item| async move {
317 assert_eq!(item, test_data);
318 Ok(())
319 }))
320 }));
321
322 let cx = &mut Context::from_waker(futures_task::noop_waker_ref());
323
324 assert_eq!(Poll::Ready(Ok(())), lazy_sink.as_mut().poll_ready(cx));
325 assert_eq!(Ok(()), lazy_sink.as_mut().start_send(test_data.as_slice()));
326 assert_eq!(Poll::Ready(Ok(())), lazy_sink.as_mut().poll_flush(cx));
327 assert_eq!(Ok(()), lazy_sink.as_mut().start_send(test_data.as_slice()));
328 assert_eq!(Poll::Ready(Ok(())), lazy_sink.as_mut().poll_flush(cx));
329 assert_eq!(Poll::Ready(Ok(())), lazy_sink.as_mut().poll_close(cx));
330 }
331
332 #[tokio::test]
333 async fn test_lazy_source() {
334 let test_data = b"test";
335
336 let mut lazy_source = LazySource::new(|| {
337 Box::pin(async {
338 Result::<_, Infallible>::Ok(futures_util::stream::iter(vec![test_data.as_slice()]))
339 })
340 });
341
342 assert_eq!(lazy_source.next().await.unwrap(), test_data);
343 }
344
345 #[test]
346 fn test_lazy_source_err() {
347 let mut lazy_source = pin!(LazySource::new(|| async {
348 Result::<futures_util::stream::Empty<()>, _>::Err("Fail!")
349 }));
350
351 let cx = &mut Context::from_waker(futures_task::noop_waker_ref());
352
353 assert_eq!(Poll::Ready(None), lazy_source.as_mut().poll_next(cx));
354 }
355
356 #[test]
357 fn test_lazy_source_good() {
358 let test_data = b"test";
359
360 let mut lazy_source = pin!(LazySource::new(|| async {
361 Result::<_, Infallible>::Ok(futures_util::stream::iter(test_data))
362 }));
363
364 let cx = &mut Context::from_waker(futures_task::noop_waker_ref());
365
366 assert_eq!(Poll::Ready(Some(&b't')), lazy_source.as_mut().poll_next(cx));
367 assert_eq!(Poll::Ready(Some(&b'e')), lazy_source.as_mut().poll_next(cx));
368 assert_eq!(Poll::Ready(Some(&b's')), lazy_source.as_mut().poll_next(cx));
369 assert_eq!(Poll::Ready(Some(&b't')), lazy_source.as_mut().poll_next(cx));
370 assert_eq!(Poll::Ready(None), lazy_source.as_mut().poll_next(cx));
371 }
372}