dfir_rs/compiled/push/
persist.rs1use std::pin::Pin;
2use std::task::{Context, Poll, ready};
3
4use futures::sink::Sink;
5use pin_project_lite::pin_project;
6
7pin_project! {
8 #[must_use = "sinks do nothing unless polled"]
10 pub struct Persist<'ctx, Si, Item> {
11 #[pin]
12 sink: Si,
13 vec: &'ctx mut Vec<Item>,
14 replay_idx: usize,
15 }
16}
17
18impl<'ctx, Si, Item> Persist<'ctx, Si, Item> {
19 pub fn new(vec: &'ctx mut Vec<Item>, replay_idx: usize, sink: Si) -> Self
21 where
22 Self: Sink<Item>,
23 {
24 debug_assert!(replay_idx <= vec.len());
25
26 Self {
27 sink,
28 vec,
29 replay_idx,
30 }
31 }
32
33 fn empty_replay(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Si::Error>>
34 where
35 Si: Sink<Item>,
36 Item: Clone,
37 {
38 let mut this = self.project();
39 while let Some(item) = this.vec.get(*this.replay_idx) {
40 ready!(this.sink.as_mut().poll_ready(cx))?;
41 this.sink.as_mut().start_send(item.clone())?;
42 *this.replay_idx += 1;
43 }
44 debug_assert_eq!(this.vec.len(), *this.replay_idx);
45 Poll::Ready(Ok(()))
46 }
47}
48
49impl<'ctx, Si, Item> Sink<Item> for Persist<'ctx, Si, Item>
50where
51 Si: Sink<Item>,
52 Item: Clone,
53{
54 type Error = Si::Error;
55
56 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
57 ready!(self.as_mut().empty_replay(cx))?;
58 self.project().sink.poll_ready(cx)
59 }
60 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
61 let this = self.project();
62 debug_assert_eq!(this.vec.len(), *this.replay_idx);
63
64 this.vec.push(item.clone());
66 *this.replay_idx += 1;
67
68 this.sink.start_send(item)
69 }
70 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
71 ready!(self.as_mut().empty_replay(cx))?;
73 self.project().sink.poll_flush(cx)
74 }
75 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
76 ready!(self.as_mut().empty_replay(cx))?;
78 self.project().sink.poll_close(cx)
79 }
80}