dfir_lang/graph/ops/
zip.rs
1use quote::quote_spanned;
2use syn::parse_quote;
3
4use super::{
5 OperatorCategory, OperatorConstraints, OperatorWriteOutput, RANGE_0, RANGE_1, WriteContextArgs,
6};
7
8pub const ZIP: OperatorConstraints = OperatorConstraints {
23 name: "zip",
24 categories: &[OperatorCategory::MultiIn],
25 hard_range_inn: &(2..=2),
26 soft_range_inn: &(2..=2),
27 hard_range_out: RANGE_1,
28 soft_range_out: RANGE_1,
29 num_args: 0,
30 persistence_args: &(0..=2),
31 type_args: RANGE_0,
32 is_external_input: false,
33 has_singleton_output: false,
34 flo_type: None,
35 ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })),
36 ports_out: None,
37 input_delaytype_fn: |_| None,
38 write_fn: |wc @ &WriteContextArgs {
39 root,
40 context,
41 df_ident,
42 op_span,
43 ident,
44 is_pull,
45 inputs,
46 ..
47 },
48 diagnostics| {
49 assert!(is_pull);
50
51 let [lhs_persistence, rhs_persistence] = wc.persistence_args_disallow_mutable(diagnostics);
52
53 let lhs_ident = wc.make_ident("lhs");
54 let rhs_ident = wc.make_ident("rhs");
55
56 let write_prologue = quote_spanned! {op_span=>
57 let #lhs_ident = #df_ident.add_state(::std::cell::RefCell::new(::std::vec::Vec::new()));
58 let #rhs_ident = #df_ident.add_state(::std::cell::RefCell::new(::std::vec::Vec::new()));
59 };
60
61 let write_prologue_after_lhs = wc
62 .persistence_as_state_lifespan(lhs_persistence)
63 .map(|lifespan| {
64 quote_spanned! {op_span=>
65 #df_ident.set_state_lifespan_hook(#lhs_ident, #lifespan, |rcell| { rcell.borrow_mut().clear(); });
66 }
67 });
68 let write_prologue_after_rhs = wc
69 .persistence_as_state_lifespan(rhs_persistence)
70 .map(|lifespan| {
71 quote_spanned! {op_span=>
72 #df_ident.set_state_lifespan_hook(#rhs_ident, #lifespan, |rcell| { rcell.borrow_mut().clear(); });
73 }
74 });
75
76 let lhs_input = &inputs[0];
77 let rhs_input = &inputs[1];
78 let write_iterator = quote_spanned! {op_span=>
79 let #ident = {
80 let (mut lhs_buf, mut rhs_buf) = unsafe {
81 (
83 #context.state_ref_unchecked(#lhs_ident).borrow_mut(),
84 #context.state_ref_unchecked(#rhs_ident).borrow_mut(),
85 )
86 };
87
88 #root::itertools::Itertools::zip_longest(
89 ::std::mem::take(&mut *lhs_buf).into_iter().chain(#lhs_input),
90 ::std::mem::take(&mut *rhs_buf).into_iter().chain(#rhs_input),
91 )
92 .filter_map(move |either| {
93 match either {
94 #root::itertools::EitherOrBoth::Both(lhs, rhs) => {
95 return Some((lhs, rhs));
96 },
97 #root::itertools::EitherOrBoth::Left(lhs) => lhs_buf.push(lhs),
98 #root::itertools::EitherOrBoth::Right(rhs) => rhs_buf.push(rhs),
99 }
100 None
101 })
102 };
103 };
104
105 Ok(OperatorWriteOutput {
106 write_prologue,
107 write_prologue_after: quote_spanned! {op_span=>
108 #write_prologue_after_lhs
109 #write_prologue_after_rhs
110 },
111 write_iterator,
112 ..Default::default()
113 })
114 },
115};