dfir_lang/graph/ops/
repeat_n.rs

1use quote::quote_spanned;
2
3use super::{
4    FloType, OperatorCategory, OperatorConstraints, OperatorWriteOutput, WriteContextArgs, RANGE_0,
5    RANGE_1,
6};
7
8/// Given a _bounded_ input stream, emits all values repeatedly over `N` iterations, in the same order.
9///
10/// Will cause `N` loop iterations.
11pub const REPEAT_N: OperatorConstraints = OperatorConstraints {
12    name: "repeat_n",
13    categories: &[OperatorCategory::Windowing],
14    hard_range_inn: RANGE_1,
15    soft_range_inn: RANGE_1,
16    hard_range_out: RANGE_1,
17    soft_range_out: RANGE_1,
18    num_args: 1,
19    persistence_args: RANGE_0,
20    type_args: RANGE_0,
21    is_external_input: false,
22    has_singleton_output: true,
23    flo_type: Some(FloType::Windowing),
24    ports_inn: None,
25    ports_out: None,
26    input_delaytype_fn: |_| None,
27    write_fn: |wc @ &WriteContextArgs {
28                   context,
29                   df_ident,
30                   op_span,
31                   arguments,
32                   ident,
33                   is_pull,
34                   inputs,
35                   singleton_output_ident,
36                   ..
37               },
38               _diagnostics| {
39        assert!(is_pull);
40
41        let write_prologue = quote_spanned! {op_span=>
42            #[allow(clippy::redundant_closure_call)]
43            let #singleton_output_ident = #df_ident.add_state(
44                ::std::cell::RefCell::new(::std::vec::Vec::new())
45            );
46
47            // TODO(mingwei): Is this needed?
48            // Reset the value to the initializer fn if it is a new tick.
49            #df_ident.set_state_tick_hook(#singleton_output_ident, move |rcell| { rcell.take(); });
50        };
51
52        let vec_ident = wc.make_ident("vec");
53
54        let input = &inputs[0];
55        let write_iterator = quote_spanned! {op_span=>
56            let mut #vec_ident = unsafe {
57                // SAFETY: handle from `#df_ident.add_state(..)`.
58                #context.state_ref_unchecked(#singleton_output_ident)
59            }.borrow_mut();
60
61            if 0 == #context.loop_iter_count() {
62                *#vec_ident = #input.collect::<::std::vec::Vec<_>>();
63            }
64            let #ident = std::iter::IntoIterator::into_iter(::std::clone::Clone::clone(&*#vec_ident));
65        };
66
67        // Reschedule, to repeat.
68        let count_arg = &arguments[0];
69        let write_iterator_after = quote_spanned! {op_span=>
70            {
71                if #context.loop_iter_count() + 1 < #count_arg {
72                    #context.reschedule_loop_block();
73                }
74            }
75        };
76
77        Ok(OperatorWriteOutput {
78            write_prologue,
79            write_iterator,
80            write_iterator_after,
81        })
82    },
83};