dfir_lang/graph/ops/
prefix.rs

1use quote::quote_spanned;
2
3use super::{
4    FloType, OperatorCategory, OperatorConstraints, OperatorWriteOutput, WriteContextArgs, RANGE_0,
5    RANGE_1,
6};
7
8/// Given an _unbounded_ input stream, emits full prefixes of the input, of arbitrarily increasing length, in the same order.
9///
10/// Will cause additional loop iterations as long as new values arrive.
11pub const PREFIX: OperatorConstraints = OperatorConstraints {
12    name: "prefix",
13    categories: &[OperatorCategory::Fold, OperatorCategory::Windowing],
14    hard_range_inn: RANGE_1,
15    soft_range_inn: RANGE_1,
16    hard_range_out: &(0..=1),
17    soft_range_out: &(0..=1),
18    num_args: 0,
19    persistence_args: RANGE_0,
20    type_args: RANGE_0,
21    is_external_input: false,
22    has_singleton_output: true,
23    flo_type: Some(FloType::Windowing),
24    ports_inn: None,
25    ports_out: None,
26    input_delaytype_fn: |_| None,
27    write_fn: |wc @ &WriteContextArgs {
28                   context,
29                   df_ident,
30                   op_span,
31                   ident,
32                   is_pull,
33                   inputs,
34                   singleton_output_ident,
35                   ..
36               },
37               _diagnostics| {
38        assert!(is_pull);
39
40        let write_prologue = quote_spanned! {op_span=>
41            #[allow(clippy::redundant_closure_call)]
42            let #singleton_output_ident = #df_ident.add_state(
43                ::std::cell::RefCell::new(::std::vec::Vec::new())
44            );
45        };
46
47        let vec_ident = wc.make_ident("vec");
48
49        let input = &inputs[0];
50        let write_iterator = quote_spanned! {op_span=>
51            let mut #vec_ident = unsafe {
52                // SAFETY: handle from `#df_ident.add_state(..)`.
53                #context.state_ref_unchecked(#singleton_output_ident)
54            }.borrow_mut();
55            ::std::iter::Extend::extend(&mut *#vec_ident, #input);
56            let #ident = ::std::iter::IntoIterator::into_iter(::std::clone::Clone::clone(&*#vec_ident));
57        };
58        let write_iterator_after = quote_spanned! {op_span=>
59            #context.allow_another_iteration();
60        };
61
62        Ok(OperatorWriteOutput {
63            write_prologue,
64            write_iterator,
65            write_iterator_after,
66        })
67    },
68};