dfir_rs/compiled/pull/
lattice_bimorphism.rs

1use std::cell::RefCell;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures::Stream;
6use futures::stream::FusedStream;
7use lattices::{LatticeBimorphism, Merge};
8use pin_project_lite::pin_project;
9
10pin_project! {
11    /// Stream combinator for lattice bimorphism operations.
12    #[must_use = "streams do nothing unless polled"]
13    pub struct LatticeBimorphismStream<'a, Func, LhsStream, RhsStream, LhsState, RhsState, Output> {
14        #[pin]
15        lhs_stream: LhsStream,
16        #[pin]
17        rhs_stream: RhsStream,
18
19        func: Func,
20
21        lhs_state: &'a RefCell<LhsState>,
22        rhs_state: &'a RefCell<RhsState>,
23
24        output: Option<Output>,
25    }
26}
27
28impl<'a, Func, LhsStream, RhsStream, LhsState, RhsState, Output>
29    LatticeBimorphismStream<'a, Func, LhsStream, RhsStream, LhsState, RhsState, Output>
30where
31    Func: 'a
32        + LatticeBimorphism<LhsState, RhsStream::Item, Output = Output>
33        + LatticeBimorphism<LhsStream::Item, RhsState, Output = Output>,
34    LhsStream: 'a + FusedStream,
35    RhsStream: 'a + FusedStream,
36    LhsState: 'static + Clone,
37    RhsState: 'static + Clone,
38    Output: Merge<Output>,
39{
40    /// Creates a new `LatticeBimorphismStream`.
41    pub fn new(
42        lhs_stream: LhsStream,
43        rhs_stream: RhsStream,
44        func: Func,
45        lhs_state: &'a RefCell<LhsState>,
46        rhs_state: &'a RefCell<RhsState>,
47    ) -> Self {
48        Self {
49            lhs_stream,
50            rhs_stream,
51            func,
52            lhs_state,
53            rhs_state,
54            output: None,
55        }
56    }
57}
58
59impl<'a, Func, LhsStream, RhsStream, LhsState, RhsState, Output> Stream
60    for LatticeBimorphismStream<'a, Func, LhsStream, RhsStream, LhsState, RhsState, Output>
61where
62    Func: 'a
63        + LatticeBimorphism<LhsState, RhsStream::Item, Output = Output>
64        + LatticeBimorphism<LhsStream::Item, RhsState, Output = Output>,
65    LhsStream: 'a + FusedStream,
66    RhsStream: 'a + FusedStream,
67    LhsState: 'static + Clone,
68    RhsState: 'static + Clone,
69    Output: Merge<Output>,
70{
71    type Item = Output;
72
73    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
74        let mut this = self.project();
75
76        loop {
77            let lhs_poll = this.lhs_stream.as_mut().poll_next(cx);
78            let lhs_pending = lhs_poll.is_pending();
79            let mut live = false;
80
81            if let Poll::Ready(Some(lhs_item)) = lhs_poll {
82                live = true;
83                let delta = this.func.call(lhs_item, this.rhs_state.borrow().clone());
84                if let Some(output) = this.output.as_mut() {
85                    output.merge(delta);
86                } else {
87                    this.output.replace(delta);
88                }
89            }
90
91            let rhs_poll = this.rhs_stream.as_mut().poll_next(cx);
92            let rhs_pending = rhs_poll.is_pending();
93            if let Poll::Ready(Some(rhs_item)) = rhs_poll {
94                live = true;
95                let delta = this.func.call(this.lhs_state.borrow().clone(), rhs_item);
96                if let Some(output) = this.output.as_mut() {
97                    output.merge(delta);
98                } else {
99                    this.output.replace(delta);
100                }
101            }
102
103            if rhs_pending && lhs_pending {
104                return Poll::Pending;
105            }
106
107            if !live && !rhs_pending && !lhs_pending {
108                return Poll::Ready(this.output.take());
109            }
110            // Both streams may continue to be polled EOS (`None`) on subsequent loops or calls, so they must be fused.
111        }
112    }
113}