dfir_lang/graph/ops/
cross_singleton.rs

1use quote::{ToTokens, quote_spanned};
2use syn::parse_quote;
3
4use super::{
5    DelayType, OperatorCategory, OperatorConstraints, OperatorWriteOutput, RANGE_0, RANGE_1,
6    WriteContextArgs,
7};
8use crate::graph::PortIndexValue;
9
10/// > 2 input streams, 1 output stream, no arguments.
11///
12/// Operates like cross-join, but treats one of the inputs as a "singleton"-like stream, emitting
13/// ignoring everything after the first element. This operator blocks on the singleton input, and
14/// then joins it with all the elements in the other stream if an element is present. This operator
15/// is useful when a singleton input must be used to transform elements of a stream, since unlike
16/// cross-product it avoids cloning the stream of inputs. It is also useful for creating conditional
17/// branches, since the operator short circuits if the singleton input produces no values.
18///
19/// There are two inputs to `cross_singleton`, they are `input` and `single`.
20/// `input` is the input data flow, and `single` is the singleton input.
21///
22/// ```dfir
23/// join = cross_singleton();
24///
25/// source_iter([1, 2, 3]) -> [input]join;
26/// source_iter([0]) -> [single]join;
27///
28/// join -> assert_eq([(1, 0), (2, 0), (3, 0)]);
29/// ```
30pub const CROSS_SINGLETON: OperatorConstraints = OperatorConstraints {
31    name: "cross_singleton",
32    categories: &[OperatorCategory::MultiIn],
33    persistence_args: RANGE_0,
34    type_args: RANGE_0,
35    hard_range_inn: &(2..=2),
36    soft_range_inn: &(2..=2),
37    hard_range_out: RANGE_1,
38    soft_range_out: RANGE_1,
39    num_args: 0,
40    is_external_input: false,
41    has_singleton_output: false,
42    flo_type: None,
43    ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { input, single })),
44    ports_out: None,
45    input_delaytype_fn: |idx| match idx {
46        PortIndexValue::Path(path) if "single" == path.to_token_stream().to_string() => {
47            Some(DelayType::Stratum)
48        }
49        _else => None,
50    },
51    write_fn: |wc @ &WriteContextArgs {
52                   root,
53                   context,
54                   df_ident,
55                   ident,
56                   op_span,
57                   inputs,
58                   is_pull,
59                   work_fn,
60                   ..
61               },
62               _diagnostics| {
63        assert!(is_pull);
64
65        let stream_input = &inputs[0];
66        let singleton_input = &inputs[1];
67        let singleton_handle_ident = wc.make_ident("singleton_handle");
68
69        let write_prologue = quote_spanned! {op_span=>
70            let #singleton_handle_ident = #df_ident.add_state(
71                ::std::cell::RefCell::new(::std::option::Option::None)
72            );
73            // Reset the value if it is a new tick. TODO(mingwei): handle other lifespans?
74            #df_ident.set_state_lifespan_hook(#singleton_handle_ident, #root::scheduled::graph::StateLifespan::Tick, |rcell| { rcell.take(); });
75        };
76
77        let write_iterator = quote_spanned! {op_span=>
78            let #ident = {
79                #[inline(always)]
80                fn cross_singleton_guard<Singleton, Item, SingletonIter, Stream>(
81                    mut singleton_state_mut: std::cell::RefMut<'_, Option<Singleton>>,
82                    mut singleton_input: SingletonIter,
83                    stream_input: Stream,
84                ) -> impl use<Item, Singleton, Stream, /*TODO: https://github.com/rust-lang/rust/issues/130043 */ SingletonIter> + Iterator<Item = (Item, Singleton)>
85                where
86                    Singleton: ::std::clone::Clone,
87                    SingletonIter: Iterator<Item = Singleton>,
88                    Stream: Iterator<Item = Item>,
89                {
90                    let singleton_value_opt = #work_fn(|| match &*singleton_state_mut {
91                        ::std::option::Option::Some(singleton_value) => Some(singleton_value.clone()),
92                        ::std::option::Option::None => {
93                            let singleton_value_opt = singleton_input.next();
94                            *singleton_state_mut = singleton_value_opt.clone();
95                            singleton_value_opt
96                        }
97                    });
98                    singleton_value_opt
99                        .map(|singleton_value| {
100                            stream_input.map(move |item| (item, ::std::clone::Clone::clone(&singleton_value)))
101                        })
102                        .into_iter()
103                        .flatten()
104                }
105                cross_singleton_guard(
106                    unsafe {
107                        // SAFETY: handle from `#df_ident.add_state(..)`.
108                        #context.state_ref_unchecked(#singleton_handle_ident)
109                    }.borrow_mut(),
110                    #singleton_input,
111                    #stream_input,
112                )
113            };
114        };
115
116        Ok(OperatorWriteOutput {
117            write_prologue,
118            write_iterator,
119            ..Default::default()
120        })
121    },
122};