dfir_lang/graph/ops/
cross_singleton.rs

1use quote::{ToTokens, quote_spanned};
2use syn::parse_quote;
3
4use super::{
5    DelayType, OperatorCategory, OperatorConstraints, OperatorWriteOutput, RANGE_0, RANGE_1,
6    WriteContextArgs,
7};
8use crate::graph::PortIndexValue;
9
10/// > 2 input streams, 1 output stream, no arguments.
11///
12/// Operates like cross-join, but treats one of the inputs as a "singleton"-like stream, emitting
13/// ignoring everything after the first element. This operator blocks on the singleton input, and
14/// then joins it with all the elements in the other stream if an element is present. This operator
15/// is useful when a singleton input must be used to transform elements of a stream, since unlike
16/// cross-product it avoids cloning the stream of inputs. It is also useful for creating conditional
17/// branches, since the operator short circuits if the singleton input produces no values.
18///
19/// There are two inputs to `cross_singleton`, they are `input` and `single`.
20/// `input` is the input data flow, and `single` is the singleton input.
21///
22/// ```dfir
23/// join = cross_singleton();
24///
25/// source_iter([1, 2, 3]) -> [input]join;
26/// source_iter([0]) -> [single]join;
27///
28/// join -> assert_eq([(1, 0), (2, 0), (3, 0)]);
29/// ```
30pub const CROSS_SINGLETON: OperatorConstraints = OperatorConstraints {
31    name: "cross_singleton",
32    categories: &[OperatorCategory::MultiIn],
33    persistence_args: RANGE_0,
34    type_args: RANGE_0,
35    hard_range_inn: &(2..=2),
36    soft_range_inn: &(2..=2),
37    hard_range_out: RANGE_1,
38    soft_range_out: RANGE_1,
39    num_args: 0,
40    is_external_input: false,
41    has_singleton_output: false,
42    flo_type: None,
43    ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { input, single })),
44    ports_out: None,
45    input_delaytype_fn: |idx| match idx {
46        PortIndexValue::Path(path) if "single" == path.to_token_stream().to_string() => {
47            Some(DelayType::Stratum)
48        }
49        _else => None,
50    },
51    write_fn: |wc @ &WriteContextArgs {
52                   context,
53                   df_ident,
54                   ident,
55                   op_span,
56                   inputs,
57                   is_pull,
58                   work_fn,
59                   ..
60               },
61               _diagnostics| {
62        assert!(is_pull);
63
64        let stream_input = &inputs[0];
65        let singleton_input = &inputs[1];
66        let singleton_handle_ident = wc.make_ident("singleton_handle");
67
68        let write_prologue = quote_spanned! {op_span=>
69            let #singleton_handle_ident = #df_ident.add_state(
70                ::std::cell::RefCell::new(::std::option::Option::None)
71            );
72            // Reset the value if it is a new tick.
73            #df_ident.set_state_tick_hook(#singleton_handle_ident, |rcell| { rcell.take(); });
74        };
75
76        let write_iterator = quote_spanned! {op_span=>
77            let #ident = {
78                #[inline(always)]
79                fn cross_singleton_guard<Singleton, Item, SingletonIter, Stream>(
80                    mut singleton_state_mut: std::cell::RefMut<'_, Option<Singleton>>,
81                    mut singleton_input: SingletonIter,
82                    stream_input: Stream,
83                ) -> impl use<Item, Singleton, Stream, /*TODO: https://github.com/rust-lang/rust/issues/130043 */ SingletonIter> + Iterator<Item = (Item, Singleton)>
84                where
85                    Singleton: ::std::clone::Clone,
86                    SingletonIter: Iterator<Item = Singleton>,
87                    Stream: Iterator<Item = Item>,
88                {
89                    let singleton_value_opt = #work_fn(|| match &*singleton_state_mut {
90                        ::std::option::Option::Some(singleton_value) => Some(singleton_value.clone()),
91                        ::std::option::Option::None => {
92                            let singleton_value_opt = singleton_input.next();
93                            *singleton_state_mut = singleton_value_opt.clone();
94                            singleton_value_opt
95                        }
96                    });
97                    singleton_value_opt
98                        .map(|singleton_value| {
99                            stream_input.map(move |item| (item, ::std::clone::Clone::clone(&singleton_value)))
100                        })
101                        .into_iter()
102                        .flatten()
103                }
104                cross_singleton_guard(
105                    unsafe {
106                        // SAFETY: handle from `#df_ident.add_state(..)`.
107                        #context.state_ref_unchecked(#singleton_handle_ident)
108                    }.borrow_mut(),
109                    #singleton_input,
110                    #stream_input,
111                )
112            };
113        };
114
115        Ok(OperatorWriteOutput {
116            write_prologue,
117            write_iterator,
118            ..Default::default()
119        })
120    },
121};