sinktools/
lazy.rs

1//! [`LazySink`], [`LazySource`], and related items.
2
3use core::future::Future;
4use core::marker::PhantomData;
5use core::pin::Pin;
6use core::task::{Context, Poll, ready};
7
8use futures_util::{FutureExt, Sink, Stream};
9use pin_project_lite::pin_project;
10
11pin_project! {
12    #[project = LazySinkProj]
13    enum LazySinkState<Func, Fut, Si, Item> {
14        Uninit {
15            // Initialization func, always `Some`
16            func: Option<Func>,
17        },
18        Thunkulating {
19            // Initialization future.
20            #[pin]
21            future: Fut,
22             // First item sent, always `Some`
23            item: Option<Item>,
24        },
25        Done {
26            // The final sink.
27            #[pin]
28            sink: Si,
29            // First item sent, then after always `None`.
30            buf: Option<Item>,
31        },
32    }
33}
34
35pin_project! {
36    /// A lazy sink will attempt to get a [`Sink`] using the init `Func` when the first item is sent into it.
37    pub struct LazySink<Func, Fut, Si, Item> {
38        #[pin]
39        state: LazySinkState<Func, Fut, Si, Item>,
40    }
41}
42
43impl<Func, Fut, Si, Item, Error> LazySink<Func, Fut, Si, Item>
44where
45    Func: FnOnce() -> Fut,
46    Fut: Future<Output = Result<Si, Error>>,
47    Si: Sink<Item>,
48    Error: From<Si::Error>,
49{
50    /// Creates a new `LazySink` with the given initialization `func`.
51    pub fn new(func: Func) -> Self {
52        Self {
53            state: LazySinkState::Uninit { func: Some(func) },
54        }
55    }
56
57    fn poll_sink_op(
58        self: Pin<&mut Self>,
59        cx: &mut Context<'_>,
60        sink_op: impl FnOnce(Pin<&mut Si>, &mut Context<'_>) -> Poll<Result<(), Si::Error>>,
61    ) -> Poll<Result<(), Error>> {
62        let mut this = self.project();
63
64        if let LazySinkProj::Uninit { func: _ } = this.state.as_mut().project() {
65            return Poll::Ready(Ok(())); // Lazy
66        }
67
68        if let LazySinkProj::Thunkulating { mut future, item } = this.state.as_mut().project() {
69            let sink = ready!(future.poll_unpin(cx))?;
70            let buf = Some(item.take().unwrap());
71            this.state.as_mut().set(LazySinkState::Done { sink, buf });
72        }
73
74        if let LazySinkProj::Done { mut sink, buf } = this.state.as_mut().project() {
75            if buf.is_some() {
76                let () = ready!(sink.as_mut().poll_ready(cx)?);
77                let () = sink.as_mut().start_send(buf.take().unwrap())?;
78            }
79            return (sink_op)(sink, cx).map_err(From::from);
80        }
81
82        panic!("`LazySink` in invalid state.");
83    }
84}
85
86impl<Func, Fut, Si, Item, Error> Sink<Item> for LazySink<Func, Fut, Si, Item>
87where
88    Func: FnOnce() -> Fut,
89    Fut: Future<Output = Result<Si, Error>>,
90    Si: Sink<Item>,
91    Error: From<Si::Error>,
92{
93    type Error = Error;
94
95    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
96        self.poll_sink_op(cx, Sink::poll_ready)
97    }
98
99    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
100        let mut this = self.project();
101
102        if let LazySinkProj::Uninit { func } = this.state.as_mut().project() {
103            let func = func.take().unwrap();
104            let future = (func)();
105            let item = Some(item);
106            this.state
107                .as_mut()
108                .set(LazySinkState::Thunkulating { future, item });
109            Ok(())
110        } else if let LazySinkProj::Done { sink, buf: _buf } = this.state.project() {
111            debug_assert!(_buf.is_none());
112            sink.start_send(item).map_err(From::from)
113        } else {
114            panic!("`LazySink` not ready.");
115        }
116    }
117
118    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
119        self.poll_sink_op(cx, Sink::poll_flush)
120    }
121
122    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
123        self.poll_sink_op(cx, Sink::poll_close)
124    }
125}
126
127pin_project! {
128    #[project = LazySourceProj]
129    enum LazySourceState<St, Fut, Func> {
130        Uninit {
131            // Initialization func, always `Some`.
132            func: Option<Func>,
133        },
134        Thunkulating {
135            // Initialization future.
136            #[pin]
137            fut: Fut
138        },
139        Done {
140            #[pin]
141            stream: St,
142        },
143    }
144}
145
146pin_project! {
147    /// A lazy source will attempt to acquire a stream using the thunk when the first item is pulled from it
148    pub struct LazySource<ThunkFunc, StreamType, PreparingFutureType, StreamItemType> {
149        #[pin]
150        state: LazySourceState<StreamType, PreparingFutureType, ThunkFunc>,
151        _phantom: PhantomData<StreamItemType>,
152    }
153}
154
155impl<F, S, Fut, E, T> LazySource<F, S, Fut, T>
156where
157    F: FnOnce() -> Fut,
158    Fut: Future<Output = Result<S, E>>,
159    S: Stream<Item = T>,
160{
161    /// Creates a new [`LazySource`]. Thunk should be something callable that returns a future that resolves to a [`Stream`] that the lazy sink will forward items to.
162    pub fn new(thunk: F) -> Self {
163        Self {
164            state: LazySourceState::Uninit { func: Some(thunk) },
165            _phantom: Default::default(),
166        }
167    }
168}
169
170impl<F, S, Fut, E, T> Stream for LazySource<F, S, Fut, T>
171where
172    F: FnOnce() -> Fut,
173    Fut: Future<Output = Result<S, E>>,
174    S: Stream<Item = T>,
175{
176    type Item = S::Item;
177
178    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
179        if let LazySourceProj::Uninit { func } = self.as_mut().project().state.project() {
180            let func = func.take().unwrap();
181            self.as_mut()
182                .project()
183                .state
184                .set(LazySourceState::Thunkulating { fut: func() });
185        }
186
187        if let LazySourceProj::Thunkulating { fut } = self.as_mut().project().state.project() {
188            match ready!(fut.poll(cx)) {
189                Ok(stream) => {
190                    self.as_mut()
191                        .project()
192                        .state
193                        .set(LazySourceState::Done { stream });
194                }
195                Err(_e) => {
196                    // TODO(mingwei): handle errors.
197                    return Poll::Ready(None);
198                }
199            }
200        }
201
202        if let LazySourceProj::Done { stream } = self.as_mut().project().state.project() {
203            return stream.poll_next(cx);
204        }
205
206        panic!("`LazySource` in invalid state.");
207    }
208}
209
210#[cfg(test)]
211mod test {
212    use core::cell::RefCell;
213    use core::convert::Infallible;
214    use core::pin::pin;
215    use core::task::Context;
216
217    use futures_util::{Sink, SinkExt, StreamExt};
218
219    use super::*;
220    use crate::for_each::ForEach;
221
222    #[tokio::test]
223    async fn test_lazy_sink() {
224        let test_data = b"test";
225        let output = RefCell::new(Vec::new());
226
227        let mut lazy_sink = LazySink::new(|| {
228            Box::pin(async {
229                Result::<_, Infallible>::Ok(ForEach::new(|item| {
230                    output.borrow_mut().extend_from_slice(item);
231                }))
232            })
233        });
234
235        SinkExt::send(&mut lazy_sink, test_data.as_slice())
236            .await
237            .unwrap();
238
239        SinkExt::send(&mut lazy_sink, test_data.as_slice())
240            .await
241            .unwrap();
242
243        SinkExt::flush(&mut lazy_sink).await.unwrap();
244
245        SinkExt::close(&mut lazy_sink).await.unwrap();
246
247        assert_eq!(&output.borrow().as_slice()[0..test_data.len()], test_data);
248        assert_eq!(&output.borrow().as_slice()[test_data.len()..], test_data);
249    }
250
251    #[test]
252    fn test_lazy_sink_fut_err() {
253        enum DummySink {}
254        impl Sink<()> for DummySink {
255            type Error = &'static str;
256            fn poll_ready(
257                self: Pin<&mut Self>,
258                _cx: &mut Context<'_>,
259            ) -> Poll<Result<(), Self::Error>> {
260                panic!()
261            }
262            fn start_send(self: Pin<&mut Self>, _item: ()) -> Result<(), Self::Error> {
263                panic!()
264            }
265            fn poll_flush(
266                self: Pin<&mut Self>,
267                _cx: &mut Context<'_>,
268            ) -> Poll<Result<(), Self::Error>> {
269                panic!()
270            }
271            fn poll_close(
272                self: Pin<&mut Self>,
273                _cx: &mut Context<'_>,
274            ) -> Poll<Result<(), Self::Error>> {
275                panic!()
276            }
277        }
278
279        let mut lazy_sink = pin!(LazySink::new(|| async {
280            Result::<DummySink, _>::Err("Fail!")
281        }));
282
283        let cx = &mut Context::from_waker(futures_task::noop_waker_ref());
284
285        assert_eq!(Poll::Ready(Ok(())), lazy_sink.as_mut().poll_ready(cx));
286        assert_eq!(Poll::Ready(Ok(())), lazy_sink.as_mut().poll_flush(cx));
287        assert_eq!(Poll::Ready(Ok(())), lazy_sink.as_mut().poll_close(cx));
288        assert_eq!(Poll::Ready(Ok(())), lazy_sink.as_mut().poll_ready(cx));
289        assert_eq!(Ok(()), lazy_sink.as_mut().start_send(())); // Works because item is buffered.
290        assert_eq!(Poll::Ready(Err("Fail!")), lazy_sink.as_mut().poll_flush(cx)); // Now anything fails.
291    }
292
293    #[test]
294    fn test_lazy_sink_sink_err() {
295        let mut lazy_sink = pin!(LazySink::new(|| async {
296            Ok(futures_util::sink::unfold((), |(), _item| async {
297                Err("Fail!")
298            }))
299        }));
300
301        let cx = &mut Context::from_waker(futures_task::noop_waker_ref());
302
303        assert_eq!(Poll::Ready(Ok(())), lazy_sink.as_mut().poll_ready(cx));
304        assert_eq!(Poll::Ready(Ok(())), lazy_sink.as_mut().poll_flush(cx));
305        assert_eq!(Poll::Ready(Ok(())), lazy_sink.as_mut().poll_close(cx));
306        assert_eq!(Poll::Ready(Ok(())), lazy_sink.as_mut().poll_ready(cx));
307        assert_eq!(Ok(()), lazy_sink.as_mut().start_send(())); // Works because item is buffered.
308        assert_eq!(Poll::Ready(Err("Fail!")), lazy_sink.as_mut().poll_flush(cx)); // Now anything fails.
309    }
310
311    #[test]
312    fn test_lazy_sink_good() {
313        let test_data = b"test";
314
315        let mut lazy_sink = pin!(LazySink::new(|| async {
316            Result::<_, ()>::Ok(futures_util::sink::unfold((), |(), item| async move {
317                assert_eq!(item, test_data);
318                Ok(())
319            }))
320        }));
321
322        let cx = &mut Context::from_waker(futures_task::noop_waker_ref());
323
324        assert_eq!(Poll::Ready(Ok(())), lazy_sink.as_mut().poll_ready(cx));
325        assert_eq!(Ok(()), lazy_sink.as_mut().start_send(test_data.as_slice()));
326        assert_eq!(Poll::Ready(Ok(())), lazy_sink.as_mut().poll_flush(cx));
327        assert_eq!(Ok(()), lazy_sink.as_mut().start_send(test_data.as_slice()));
328        assert_eq!(Poll::Ready(Ok(())), lazy_sink.as_mut().poll_flush(cx));
329        assert_eq!(Poll::Ready(Ok(())), lazy_sink.as_mut().poll_close(cx));
330    }
331
332    #[tokio::test]
333    async fn test_lazy_source() {
334        let test_data = b"test";
335
336        let mut lazy_source = LazySource::new(|| {
337            Box::pin(async {
338                Result::<_, Infallible>::Ok(futures_util::stream::iter(vec![test_data.as_slice()]))
339            })
340        });
341
342        assert_eq!(lazy_source.next().await.unwrap(), test_data);
343    }
344
345    #[test]
346    fn test_lazy_source_err() {
347        let mut lazy_source = pin!(LazySource::new(|| async {
348            Result::<futures_util::stream::Empty<()>, _>::Err("Fail!")
349        }));
350
351        let cx = &mut Context::from_waker(futures_task::noop_waker_ref());
352
353        assert_eq!(Poll::Ready(None), lazy_source.as_mut().poll_next(cx));
354    }
355
356    #[test]
357    fn test_lazy_source_good() {
358        let test_data = b"test";
359
360        let mut lazy_source = pin!(LazySource::new(|| async {
361            Result::<_, Infallible>::Ok(futures_util::stream::iter(test_data))
362        }));
363
364        let cx = &mut Context::from_waker(futures_task::noop_waker_ref());
365
366        assert_eq!(Poll::Ready(Some(&b't')), lazy_source.as_mut().poll_next(cx));
367        assert_eq!(Poll::Ready(Some(&b'e')), lazy_source.as_mut().poll_next(cx));
368        assert_eq!(Poll::Ready(Some(&b's')), lazy_source.as_mut().poll_next(cx));
369        assert_eq!(Poll::Ready(Some(&b't')), lazy_source.as_mut().poll_next(cx));
370        assert_eq!(Poll::Ready(None), lazy_source.as_mut().poll_next(cx));
371    }
372}