dfir_rs/compiled/pull/
cross_singleton.rs

1use std::pin::Pin;
2use std::task::{Context, Poll, ready};
3
4use futures::stream::Stream;
5use pin_project_lite::pin_project;
6
7pin_project! {
8    /// Stream combinator that crosses each item from `item_stream` with a singleton value from `singleton_stream`.
9    pub struct CrossSingleton<'a, ItemSt, SingletonSt>
10    where
11        ItemSt: Stream,
12        SingletonSt: Stream,
13    SingletonSt::Item: Clone,
14    {
15        #[pin]
16        item_stream: ItemSt,
17        #[pin]
18        singleton_stream: SingletonSt,
19
20        singleton_state: &'a mut Option<SingletonSt::Item>,
21    }
22}
23
24impl<'a, ItemSt, SingletonSt> CrossSingleton<'a, ItemSt, SingletonSt>
25where
26    ItemSt: Stream,
27    SingletonSt: Stream,
28    SingletonSt::Item: Clone,
29{
30    /// Creates a new `CrossSingleton` stream combinator.
31    pub fn new(
32        item_stream: ItemSt,
33        singleton_stream: SingletonSt,
34        singleton_state: &'a mut Option<SingletonSt::Item>,
35    ) -> Self {
36        Self {
37            item_stream,
38            singleton_stream,
39            singleton_state,
40        }
41    }
42}
43
44impl<'a, ItemSt, SingletonSt> Stream for CrossSingleton<'a, ItemSt, SingletonSt>
45where
46    ItemSt: Stream,
47    SingletonSt: Stream,
48    SingletonSt::Item: Clone,
49{
50    type Item = (ItemSt::Item, SingletonSt::Item);
51
52    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
53        let this = self.project();
54
55        // Set the singleton state only if it is not already set.
56        // This short-circuits the `SingletonSt` side to the first item only.
57        let singleton = match this.singleton_state {
58            Some(singleton) => singleton,
59            None => {
60                let Some(singleton) = ready!(this.singleton_stream.poll_next(cx)) else {
61                    // If `singleton_stream` returns EOS (`None`), we return EOS, no fused needed.
62                    // This short-circuits the `ItemSt` side, dropping them.
63                    return Poll::Ready(None);
64                };
65                this.singleton_state.insert(singleton)
66            }
67        };
68
69        // Stream any items.
70        let item = ready!(this.item_stream.poll_next(cx));
71        // If `item_stream` returns EOS (`None`), we return EOS, no fused needed.
72        let pair = item.map(|item| (item, singleton.clone()));
73        Poll::Ready(pair)
74    }
75}