dfir_rs/compiled/push/
persist.rs

1use std::pin::Pin;
2use std::task::{Context, Poll, ready};
3
4use futures::sink::Sink;
5use pin_project_lite::pin_project;
6
7pin_project! {
8    /// Special sink for the `persist` operator.
9    #[must_use = "sinks do nothing unless polled"]
10    pub struct Persist<'ctx, Si, Item> {
11        #[pin]
12        sink: Si,
13        vec: &'ctx mut Vec<Item>,
14        replay_idx: usize,
15    }
16}
17
18impl<'ctx, Si, Item> Persist<'ctx, Si, Item> {
19    /// Create with the given replay and following sink.
20    pub fn new(vec: &'ctx mut Vec<Item>, replay_idx: usize, sink: Si) -> Self
21    where
22        Self: Sink<Item>,
23    {
24        debug_assert!(replay_idx <= vec.len());
25
26        Self {
27            sink,
28            vec,
29            replay_idx,
30        }
31    }
32
33    fn empty_replay(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Si::Error>>
34    where
35        Si: Sink<Item>,
36        Item: Clone,
37    {
38        let mut this = self.project();
39        while let Some(item) = this.vec.get(*this.replay_idx) {
40            ready!(this.sink.as_mut().poll_ready(cx))?;
41            this.sink.as_mut().start_send(item.clone())?;
42            *this.replay_idx += 1;
43        }
44        debug_assert_eq!(this.vec.len(), *this.replay_idx);
45        Poll::Ready(Ok(()))
46    }
47}
48
49impl<'ctx, Si, Item> Sink<Item> for Persist<'ctx, Si, Item>
50where
51    Si: Sink<Item>,
52    Item: Clone,
53{
54    type Error = Si::Error;
55
56    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
57        ready!(self.as_mut().empty_replay(cx))?;
58        self.project().sink.poll_ready(cx)
59    }
60    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
61        let this = self.project();
62        debug_assert_eq!(this.vec.len(), *this.replay_idx);
63
64        // Persist
65        this.vec.push(item.clone());
66        *this.replay_idx += 1;
67
68        this.sink.start_send(item)
69    }
70    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
71        // Ensure all replayed items are sent before flushing the underlying sink.
72        ready!(self.as_mut().empty_replay(cx))?;
73        self.project().sink.poll_flush(cx)
74    }
75    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
76        // Ensure all replayed items are sent before closing the underlying sink.
77        ready!(self.as_mut().empty_replay(cx))?;
78        self.project().sink.poll_close(cx)
79    }
80}