dfir_rs/compiled/push/
resolve_futures.rs

1use std::pin::Pin;
2use std::task::{Context, Poll, Waker, ready};
3
4use futures::sink::Sink;
5use futures::stream::{FusedStream, Stream};
6use pin_project_lite::pin_project;
7
8pin_project! {
9    /// Special sink for the `resolve_futures[_blocking][_ordered]` operators.
10    ///
11    /// `Queue` may be either [`futures::stream::FuturesOrdered`] or [`futures::stream::FuturesUnordered`].
12    #[must_use = "sinks do nothing unless polled"]
13    pub struct ResolveFutures<'ctx, Si, Queue> {
14        #[pin]
15        sink: Si,
16        queue: &'ctx mut Queue,
17        // If `Some`, this waker will schedule future ticks, so all futures should be driven
18        // by it. If `None`, the subgraph execution should block until all futures are resolved.
19        subgraph_waker: Option<Waker>,
20    }
21}
22
23impl<'ctx, Si, Queue> ResolveFutures<'ctx, Si, Queue> {
24    /// Create with the given queue and following sink.
25    pub fn new(queue: &'ctx mut Queue, subgraph_waker: Option<Waker>, sink: Si) -> Self {
26        Self {
27            sink,
28            queue,
29            subgraph_waker,
30        }
31    }
32
33    /// Empties any ready items from the queue into the following sink.
34    fn empty_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Si::Error>>
35    where
36        Si: Sink<Queue::Item>,
37        Queue: FusedStream + Unpin,
38    {
39        let mut this = self.project();
40
41        loop {
42            // Ensure the following sink is ready.
43            ready!(this.sink.as_mut().poll_ready(cx))?;
44
45            let poll_result = if let Some(w) = this.subgraph_waker.as_ref() {
46                Stream::poll_next(
47                    Pin::new(&mut **this.queue),
48                    &mut Context::<'_>::from_waker(w),
49                )
50            } else {
51                Stream::poll_next(Pin::new(&mut **this.queue), cx)
52            };
53
54            match poll_result {
55                Poll::Ready(Some(out)) => {
56                    this.sink.as_mut().start_send(out)?;
57                }
58                Poll::Ready(None) => {
59                    return Poll::Ready(Ok(()));
60                }
61                Poll::Pending => {
62                    if this.subgraph_waker.is_some() {
63                        return Poll::Ready(Ok(())); // we will be re-woken on a future tick
64                    } else {
65                        return Poll::Pending;
66                    }
67                }
68            }
69        }
70    }
71}
72
73impl<'ctx, Si, Queue, Fut> Sink<Fut> for ResolveFutures<'ctx, Si, Queue>
74where
75    Si: Sink<Fut::Output>,
76    Queue: Extend<Fut> + FusedStream<Item = Fut::Output> + Unpin,
77    Fut: Future,
78{
79    type Error = Si::Error;
80
81    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
82        self.empty_ready(cx)
83    }
84
85    fn start_send(self: Pin<&mut Self>, item: Fut) -> Result<(), Self::Error> {
86        let mut this = self.project();
87
88        this.queue.extend(std::iter::once(item));
89
90        if let Some(waker) = this.subgraph_waker.as_ref() {
91            // We MUST poll the queue stream to ensure that the futures begin.
92            // We use `this.subgraph_waker` to poll the queue stream, which means the futures are driven
93            // by the subgraph's own waker. This allows the subgraph execution to continue without waiting
94            // for the queued futures to complete; the subgraph does not block ("yield") on their readiness.
95            // If we instead used `cx.waker()`, the subgraph execution would yield ("block") until all queued
96            // futures are ready, effectively pausing subgraph progress until completion of those futures.
97            // Choose the waker based on whether you want subgraph execution to proceed independently of
98            // the queued futures, or to wait for them to complete before continuing.
99            if let Poll::Ready(Some(out)) =
100                Stream::poll_next(Pin::new(&mut **this.queue), &mut Context::from_waker(waker))
101            {
102                this.sink.as_mut().start_send(out)?;
103            }
104        }
105
106        Ok(())
107    }
108
109    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
110        ready!(self.as_mut().empty_ready(cx))?;
111        self.project().sink.poll_flush(cx)
112    }
113
114    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
115        ready!(self.as_mut().empty_ready(cx))?;
116        self.project().sink.poll_close(cx)
117    }
118}