dfir_lang/graph/ops/
zip.rs
1use quote::quote_spanned;
2use syn::parse_quote;
3
4use super::{
5 OpInstGenerics, OperatorCategory, OperatorConstraints, OperatorInstance, OperatorWriteOutput,
6 Persistence, WriteContextArgs, RANGE_0, RANGE_1,
7};
8use crate::diagnostic::{Diagnostic, Level};
9
10pub const ZIP: OperatorConstraints = OperatorConstraints {
22 name: "zip",
23 categories: &[OperatorCategory::MultiIn],
24 hard_range_inn: &(2..=2),
25 soft_range_inn: &(2..=2),
26 hard_range_out: RANGE_1,
27 soft_range_out: RANGE_1,
28 num_args: 0,
29 persistence_args: &(0..=1),
30 type_args: RANGE_0,
31 is_external_input: false,
32 has_singleton_output: false,
33 flo_type: None,
34 ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })),
35 ports_out: None,
36 input_delaytype_fn: |_| None,
37 write_fn: |wc @ &WriteContextArgs {
38 root,
39 context,
40 df_ident,
41 op_span,
42 ident,
43 is_pull,
44 inputs,
45 op_name,
46 op_inst:
47 OperatorInstance {
48 generics:
49 OpInstGenerics {
50 persistence_args, ..
51 },
52 ..
53 },
54 ..
55 },
56 diagnostics| {
57 assert!(is_pull);
58
59 let persistence = match persistence_args[..] {
60 [] => Persistence::Tick,
61 [a] => a,
62 _ => unreachable!(),
63 };
64 if Persistence::Tick != persistence {
65 diagnostics.push(Diagnostic::spanned(
66 op_span,
67 Level::Error,
68 format!("`{}()` can only have `'tick` persistence.", op_name),
69 ));
70 }
72
73 let zipbuf_ident = wc.make_ident("zipbuf");
74
75 let write_prologue = quote_spanned! {op_span=>
76 let #zipbuf_ident = #df_ident.add_state(::std::cell::RefCell::new(
77 #root::util::monotonic_map::MonotonicMap::<
78 #root::scheduled::ticks::TickInstant,
79 (::std::vec::Vec<_>, ::std::vec::Vec<_>),
80 >::default()
81 ));
82 };
83
84 let lhs = &inputs[0];
85 let rhs = &inputs[1];
86 let write_iterator = quote_spanned! {op_span=>
87 let #ident = {
88 let mut zipbuf_borrow = unsafe {
90 #context.state_ref_unchecked(#zipbuf_ident)
92 }.borrow_mut();
93 let (lhs_buf, rhs_buf) = zipbuf_borrow.get_mut_default(#context.current_tick());
94 #root::itertools::Itertools::zip_longest(
95 ::std::mem::take(lhs_buf).into_iter().chain(#lhs),
96 ::std::mem::take(rhs_buf).into_iter().chain(#rhs),
97 )
98 .filter_map(|either| {
99 if let #root::itertools::EitherOrBoth::Both(lhs, rhs) = either {
100 Some((lhs, rhs))
101 } else {
102 let mut zipbuf_burrow = unsafe {
103 #context.state_ref_unchecked(#zipbuf_ident)
105 }.borrow_mut();
106 let (lhs_buf, rhs_buf) = zipbuf_burrow.get_mut_default(#context.current_tick());
107 match either {
108 #root::itertools::EitherOrBoth::Left(lhs) => lhs_buf.push(lhs),
109 #root::itertools::EitherOrBoth::Right(rhs) => rhs_buf.push(rhs),
110 _ => ::std::unreachable!(),
111 }
112 None
113 }
114 })
115 };
116 };
117
118 Ok(OperatorWriteOutput {
119 write_prologue,
120 write_iterator,
121 ..Default::default()
122 })
123 },
124};