dfir_rs/compiled/push/
resolve_futures.rs1use std::pin::Pin;
2use std::task::{Context, Poll, Waker, ready};
3
4use futures::sink::Sink;
5use futures::stream::{FusedStream, Stream};
6use pin_project_lite::pin_project;
7
8pin_project! {
9 #[must_use = "sinks do nothing unless polled"]
13 pub struct ResolveFutures<'ctx, Si, Queue> {
14 #[pin]
15 sink: Si,
16 queue: &'ctx mut Queue,
17 subgraph_waker: Option<Waker>,
20 }
21}
22
23impl<'ctx, Si, Queue> ResolveFutures<'ctx, Si, Queue> {
24 pub fn new(queue: &'ctx mut Queue, subgraph_waker: Option<Waker>, sink: Si) -> Self {
26 Self {
27 sink,
28 queue,
29 subgraph_waker,
30 }
31 }
32
33 fn empty_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Si::Error>>
35 where
36 Si: Sink<Queue::Item>,
37 Queue: FusedStream + Unpin,
38 {
39 let mut this = self.project();
40
41 loop {
42 ready!(this.sink.as_mut().poll_ready(cx))?;
44
45 let poll_result = if let Some(w) = this.subgraph_waker.as_ref() {
46 Stream::poll_next(
47 Pin::new(&mut **this.queue),
48 &mut Context::<'_>::from_waker(w),
49 )
50 } else {
51 Stream::poll_next(Pin::new(&mut **this.queue), cx)
52 };
53
54 match poll_result {
55 Poll::Ready(Some(out)) => {
56 this.sink.as_mut().start_send(out)?;
57 }
58 Poll::Ready(None) => {
59 return Poll::Ready(Ok(()));
60 }
61 Poll::Pending => {
62 if this.subgraph_waker.is_some() {
63 return Poll::Ready(Ok(())); } else {
65 return Poll::Pending;
66 }
67 }
68 }
69 }
70 }
71}
72
73impl<'ctx, Si, Queue, Fut> Sink<Fut> for ResolveFutures<'ctx, Si, Queue>
74where
75 Si: Sink<Fut::Output>,
76 Queue: Extend<Fut> + FusedStream<Item = Fut::Output> + Unpin,
77 Fut: Future,
78{
79 type Error = Si::Error;
80
81 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
82 self.empty_ready(cx)
83 }
84
85 fn start_send(self: Pin<&mut Self>, item: Fut) -> Result<(), Self::Error> {
86 let mut this = self.project();
87
88 this.queue.extend(std::iter::once(item));
89
90 if let Some(waker) = this.subgraph_waker.as_ref() {
91 if let Poll::Ready(Some(out)) =
100 Stream::poll_next(Pin::new(&mut **this.queue), &mut Context::from_waker(waker))
101 {
102 this.sink.as_mut().start_send(out)?;
103 }
104 }
105
106 Ok(())
107 }
108
109 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
110 ready!(self.as_mut().empty_ready(cx))?;
111 self.project().sink.poll_flush(cx)
112 }
113
114 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
115 ready!(self.as_mut().empty_ready(cx))?;
116 self.project().sink.poll_close(cx)
117 }
118}