dfir_rs/compiled/pull/
cross_singleton.rs1use std::pin::Pin;
2use std::task::{Context, Poll, ready};
3
4use futures::stream::Stream;
5use pin_project_lite::pin_project;
6
7pin_project! {
8 pub struct CrossSingleton<'a, ItemSt, SingletonSt>
10 where
11 ItemSt: Stream,
12 SingletonSt: Stream,
13 SingletonSt::Item: Clone,
14 {
15 #[pin]
16 item_stream: ItemSt,
17 #[pin]
18 singleton_stream: SingletonSt,
19
20 singleton_state: &'a mut Option<SingletonSt::Item>,
21 }
22}
23
24impl<'a, ItemSt, SingletonSt> CrossSingleton<'a, ItemSt, SingletonSt>
25where
26 ItemSt: Stream,
27 SingletonSt: Stream,
28 SingletonSt::Item: Clone,
29{
30 pub fn new(
32 item_stream: ItemSt,
33 singleton_stream: SingletonSt,
34 singleton_state: &'a mut Option<SingletonSt::Item>,
35 ) -> Self {
36 Self {
37 item_stream,
38 singleton_stream,
39 singleton_state,
40 }
41 }
42}
43
44impl<'a, ItemSt, SingletonSt> Stream for CrossSingleton<'a, ItemSt, SingletonSt>
45where
46 ItemSt: Stream,
47 SingletonSt: Stream,
48 SingletonSt::Item: Clone,
49{
50 type Item = (ItemSt::Item, SingletonSt::Item);
51
52 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
53 let this = self.project();
54
55 let singleton = match this.singleton_state {
58 Some(singleton) => singleton,
59 None => {
60 let Some(singleton) = ready!(this.singleton_stream.poll_next(cx)) else {
61 return Poll::Ready(None);
64 };
65 this.singleton_state.insert(singleton)
66 }
67 };
68
69 let item = ready!(this.item_stream.poll_next(cx));
71 let pair = item.map(|item| (item, singleton.clone()));
73 Poll::Ready(pair)
74 }
75}