dfir_lang/graph/ops/
zip.rs

1use quote::quote_spanned;
2use syn::parse_quote;
3
4use super::{
5    OperatorCategory, OperatorConstraints, OperatorWriteOutput, RANGE_0, RANGE_1, WriteContextArgs,
6};
7
8/// > 2 input streams of type `V1` and `V2`, 1 output stream of type `(V1, V2)`
9///
10/// Zips the streams together, forming paired tuples of the inputs. Note that zipping is done per-tick. If you do not
11/// want to discard the excess, use [`zip_longest`](#zip_longest) instead.
12///
13/// Takes in up to two generic lifetime persistence argument, one for each input. Within the lifetime, excess items
14/// from one input or the other will be discarded. Using a `'static` persistence lifetime may result in unbounded
15/// buffering if the rates are mismatched.
16///
17/// ```dfir
18/// source_iter(0..3) -> [0]my_zip;
19/// source_iter(0..5) -> [1]my_zip;
20/// my_zip = zip() -> assert_eq([(0, 0), (1, 1), (2, 2)]);
21/// ```
22pub const ZIP: OperatorConstraints = OperatorConstraints {
23    name: "zip",
24    categories: &[OperatorCategory::MultiIn],
25    hard_range_inn: &(2..=2),
26    soft_range_inn: &(2..=2),
27    hard_range_out: RANGE_1,
28    soft_range_out: RANGE_1,
29    num_args: 0,
30    persistence_args: &(0..=2),
31    type_args: RANGE_0,
32    is_external_input: false,
33    has_singleton_output: false,
34    flo_type: None,
35    ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })),
36    ports_out: None,
37    input_delaytype_fn: |_| None,
38    write_fn: |wc @ &WriteContextArgs {
39                   root,
40                   context,
41                   df_ident,
42                   op_span,
43                   ident,
44                   is_pull,
45                   inputs,
46                   ..
47               },
48               diagnostics| {
49        assert!(is_pull);
50
51        let [lhs_persistence, rhs_persistence] = wc.persistence_args_disallow_mutable(diagnostics);
52
53        let lhs_ident = wc.make_ident("lhs");
54        let rhs_ident = wc.make_ident("rhs");
55
56        let write_prologue = quote_spanned! {op_span=>
57            let #lhs_ident = #df_ident.add_state(::std::cell::RefCell::new(::std::vec::Vec::new()));
58            let #rhs_ident = #df_ident.add_state(::std::cell::RefCell::new(::std::vec::Vec::new()));
59        };
60
61        let write_prologue_after_lhs = wc
62            .persistence_as_state_lifespan(lhs_persistence)
63            .map(|lifespan| {
64                quote_spanned! {op_span=>
65                    #df_ident.set_state_lifespan_hook(#lhs_ident, #lifespan, |rcell| { rcell.borrow_mut().clear(); });
66                }
67            });
68        let write_prologue_after_rhs = wc
69            .persistence_as_state_lifespan(rhs_persistence)
70            .map(|lifespan| {
71                quote_spanned! {op_span=>
72                    #df_ident.set_state_lifespan_hook(#rhs_ident, #lifespan, |rcell| { rcell.borrow_mut().clear(); });
73                }
74            });
75
76        let lhs_input = &inputs[0];
77        let rhs_input = &inputs[1];
78        let write_iterator = quote_spanned! {op_span=>
79            let #ident = {
80                let (mut lhs_buf, mut rhs_buf) = unsafe {
81                    // SAFETY: handle from `#df_ident.add_state(..)`.
82                    (
83                        #context.state_ref_unchecked(#lhs_ident).borrow_mut(),
84                        #context.state_ref_unchecked(#rhs_ident).borrow_mut(),
85                    )
86                };
87
88                #root::itertools::Itertools::zip_longest(
89                    ::std::mem::take(&mut *lhs_buf).into_iter().chain(#lhs_input),
90                    ::std::mem::take(&mut *rhs_buf).into_iter().chain(#rhs_input),
91                )
92                    .filter_map(move |either| {
93                        match either {
94                            #root::itertools::EitherOrBoth::Both(lhs, rhs) => {
95                                return Some((lhs, rhs));
96                            },
97                            #root::itertools::EitherOrBoth::Left(lhs) => lhs_buf.push(lhs),
98                            #root::itertools::EitherOrBoth::Right(rhs) => rhs_buf.push(rhs),
99                        }
100                        None
101                    })
102            };
103        };
104
105        Ok(OperatorWriteOutput {
106            write_prologue,
107            write_prologue_after: quote_spanned! {op_span=>
108                #write_prologue_after_lhs
109                #write_prologue_after_rhs
110            },
111            write_iterator,
112            ..Default::default()
113        })
114    },
115};