dfir_rs/compiled/push/
resolve_futures.rs1use std::cell::RefMut;
2use std::pin::Pin;
3use std::task::{Context, Poll, Waker, ready};
4
5use futures::Stream;
6use futures::sink::Sink;
7use pin_project_lite::pin_project;
8
9pin_project! {
10 #[must_use = "sinks do nothing unless polled"]
12 pub struct ResolveFutures<'ctx, Si, Queue> {
13 #[pin]
14 sink: Si,
15 queue: RefMut<'ctx, Queue>,
16 subgraph_waker: Waker,
17 }
18}
19
20impl<'ctx, Si, Queue> ResolveFutures<'ctx, Si, Queue> {
21 pub fn new(queue: RefMut<'ctx, Queue>, subgraph_waker: Waker, sink: Si) -> Self {
23 Self {
24 sink,
25 queue,
26 subgraph_waker,
27 }
28 }
29
30 fn empty_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Si::Error>>
32 where
33 Si: Sink<Queue::Item>,
34 Queue: Stream + Unpin,
35 {
36 let mut this = self.project();
37
38 loop {
39 ready!(this.sink.as_mut().poll_ready(cx))?;
41
42 if let Poll::Ready(Some(out)) = Stream::poll_next(
43 Pin::new(&mut **this.queue),
44 &mut Context::from_waker(this.subgraph_waker),
45 ) {
46 this.sink.as_mut().start_send(out)?;
47 } else {
48 return Poll::Ready(Ok(()));
49 }
50 }
51 }
52}
53
54impl<'ctx, Si, Queue, Fut> Sink<Fut> for ResolveFutures<'ctx, Si, Queue>
55where
56 Si: Sink<Fut::Output>,
57 Queue: Extend<Fut> + Stream<Item = Fut::Output> + Unpin,
58 Fut: Future,
59{
60 type Error = Si::Error;
61
62 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
63 self.empty_ready(cx)
64 }
65 fn start_send(self: Pin<&mut Self>, item: Fut) -> Result<(), Self::Error> {
66 let mut this = self.project();
67
68 this.queue.extend(std::iter::once(item));
69 if let Poll::Ready(Some(out)) = Stream::poll_next(
78 Pin::new(&mut **this.queue),
79 &mut Context::from_waker(this.subgraph_waker),
80 ) {
81 this.sink.as_mut().start_send(out)?;
82 }
83 Ok(())
84 }
85 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
86 ready!(self.as_mut().empty_ready(cx))?;
87 self.project().sink.poll_flush(cx)
88 }
89 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
90 ready!(self.as_mut().empty_ready(cx))?;
91 self.project().sink.poll_close(cx)
92 }
93}