dfir_rs/compiled/pull/
lattice_bimorphism.rs1use std::cell::RefCell;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures::Stream;
6use futures::stream::FusedStream;
7use lattices::{LatticeBimorphism, Merge};
8use pin_project_lite::pin_project;
9
10pin_project! {
11 #[must_use = "streams do nothing unless polled"]
13 pub struct LatticeBimorphismStream<'a, Func, LhsStream, RhsStream, LhsState, RhsState, Output> {
14 #[pin]
15 lhs_stream: LhsStream,
16 #[pin]
17 rhs_stream: RhsStream,
18
19 func: Func,
20
21 lhs_state: &'a RefCell<LhsState>,
22 rhs_state: &'a RefCell<RhsState>,
23
24 output: Option<Output>,
25 }
26}
27
28impl<'a, Func, LhsStream, RhsStream, LhsState, RhsState, Output>
29 LatticeBimorphismStream<'a, Func, LhsStream, RhsStream, LhsState, RhsState, Output>
30where
31 Func: 'a
32 + LatticeBimorphism<LhsState, RhsStream::Item, Output = Output>
33 + LatticeBimorphism<LhsStream::Item, RhsState, Output = Output>,
34 LhsStream: 'a + FusedStream,
35 RhsStream: 'a + FusedStream,
36 LhsState: 'static + Clone,
37 RhsState: 'static + Clone,
38 Output: Merge<Output>,
39{
40 pub fn new(
42 lhs_stream: LhsStream,
43 rhs_stream: RhsStream,
44 func: Func,
45 lhs_state: &'a RefCell<LhsState>,
46 rhs_state: &'a RefCell<RhsState>,
47 ) -> Self {
48 Self {
49 lhs_stream,
50 rhs_stream,
51 func,
52 lhs_state,
53 rhs_state,
54 output: None,
55 }
56 }
57}
58
59impl<'a, Func, LhsStream, RhsStream, LhsState, RhsState, Output> Stream
60 for LatticeBimorphismStream<'a, Func, LhsStream, RhsStream, LhsState, RhsState, Output>
61where
62 Func: 'a
63 + LatticeBimorphism<LhsState, RhsStream::Item, Output = Output>
64 + LatticeBimorphism<LhsStream::Item, RhsState, Output = Output>,
65 LhsStream: 'a + FusedStream,
66 RhsStream: 'a + FusedStream,
67 LhsState: 'static + Clone,
68 RhsState: 'static + Clone,
69 Output: Merge<Output>,
70{
71 type Item = Output;
72
73 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
74 let mut this = self.project();
75
76 loop {
77 let lhs_poll = this.lhs_stream.as_mut().poll_next(cx);
78 let lhs_pending = lhs_poll.is_pending();
79 let mut live = false;
80
81 if let Poll::Ready(Some(lhs_item)) = lhs_poll {
82 live = true;
83 let delta = this.func.call(lhs_item, this.rhs_state.borrow().clone());
84 if let Some(output) = this.output.as_mut() {
85 output.merge(delta);
86 } else {
87 this.output.replace(delta);
88 }
89 }
90
91 let rhs_poll = this.rhs_stream.as_mut().poll_next(cx);
92 let rhs_pending = rhs_poll.is_pending();
93 if let Poll::Ready(Some(rhs_item)) = rhs_poll {
94 live = true;
95 let delta = this.func.call(this.lhs_state.borrow().clone(), rhs_item);
96 if let Some(output) = this.output.as_mut() {
97 output.merge(delta);
98 } else {
99 this.output.replace(delta);
100 }
101 }
102
103 if rhs_pending && lhs_pending {
104 return Poll::Pending;
105 }
106
107 if !live && !rhs_pending && !lhs_pending {
108 return Poll::Ready(this.output.take());
109 }
110 }
112 }
113}