dfir_rs/compiled/push/
resolve_futures.rs

1use std::cell::RefMut;
2use std::pin::Pin;
3use std::task::{Context, Poll, Waker, ready};
4
5use futures::Stream;
6use futures::sink::Sink;
7use pin_project_lite::pin_project;
8
9pin_project! {
10    /// Special sink for the `resolve_futures` and `resolve_futures_ordered` operators.
11    #[must_use = "sinks do nothing unless polled"]
12    pub struct ResolveFutures<'ctx, Si, Queue> {
13        #[pin]
14        sink: Si,
15        queue: RefMut<'ctx, Queue>,
16        subgraph_waker: Waker,
17    }
18}
19
20impl<'ctx, Si, Queue> ResolveFutures<'ctx, Si, Queue> {
21    /// Create with the given queue and following sink.
22    pub fn new(queue: RefMut<'ctx, Queue>, subgraph_waker: Waker, sink: Si) -> Self {
23        Self {
24            sink,
25            queue,
26            subgraph_waker,
27        }
28    }
29
30    /// Empties any ready items from the queue into the following sink.
31    fn empty_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Si::Error>>
32    where
33        Si: Sink<Queue::Item>,
34        Queue: Stream + Unpin,
35    {
36        let mut this = self.project();
37
38        loop {
39            // Ensure the following sink is ready.
40            ready!(this.sink.as_mut().poll_ready(cx))?;
41
42            if let Poll::Ready(Some(out)) = Stream::poll_next(
43                Pin::new(&mut **this.queue),
44                &mut Context::from_waker(this.subgraph_waker),
45            ) {
46                this.sink.as_mut().start_send(out)?;
47            } else {
48                return Poll::Ready(Ok(()));
49            }
50        }
51    }
52}
53
54impl<'ctx, Si, Queue, Fut> Sink<Fut> for ResolveFutures<'ctx, Si, Queue>
55where
56    Si: Sink<Fut::Output>,
57    Queue: Extend<Fut> + Stream<Item = Fut::Output> + Unpin,
58    Fut: Future,
59{
60    type Error = Si::Error;
61
62    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
63        self.empty_ready(cx)
64    }
65    fn start_send(self: Pin<&mut Self>, item: Fut) -> Result<(), Self::Error> {
66        let mut this = self.project();
67
68        this.queue.extend(std::iter::once(item));
69        // We MUST poll the queue stream to ensure that the futures begin.
70        // We use `this.subgraph_waker` to poll the queue stream, which means the futures are driven
71        // by the subgraph's own waker. This allows the subgraph execution to continue without waiting
72        // for the queued futures to complete; the subgraph does not block ("yield") on their readiness.
73        // If we instead used `cx.waker()`, the subgraph execution would yield ("block") until all queued
74        // futures are ready, effectively pausing subgraph progress until completion of those futures.
75        // Choose the waker based on whether you want subgraph execution to proceed independently of
76        // the queued futures, or to wait for them to complete before continuing.
77        if let Poll::Ready(Some(out)) = Stream::poll_next(
78            Pin::new(&mut **this.queue),
79            &mut Context::from_waker(this.subgraph_waker),
80        ) {
81            this.sink.as_mut().start_send(out)?;
82        }
83        Ok(())
84    }
85    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
86        ready!(self.as_mut().empty_ready(cx))?;
87        self.project().sink.poll_flush(cx)
88    }
89    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
90        ready!(self.as_mut().empty_ready(cx))?;
91        self.project().sink.poll_close(cx)
92    }
93}