dfir_lang/graph/ops/
repeat_n.rs

1use quote::quote_spanned;
2
3use super::{
4    FloType, OperatorCategory, OperatorConstraints, OperatorWriteOutput, RANGE_0, RANGE_1,
5    WriteContextArgs,
6};
7
8/// Given a _bounded_ input stream, emits all values repeatedly over `N` iterations, in the same order.
9///
10/// Will cause `N` loop iterations.
11pub const REPEAT_N: OperatorConstraints = OperatorConstraints {
12    name: "repeat_n",
13    categories: &[OperatorCategory::Windowing],
14    hard_range_inn: RANGE_1,
15    soft_range_inn: RANGE_1,
16    hard_range_out: RANGE_1,
17    soft_range_out: RANGE_1,
18    num_args: 1,
19    persistence_args: RANGE_0,
20    type_args: RANGE_0,
21    is_external_input: false,
22    has_singleton_output: true,
23    flo_type: Some(FloType::Windowing),
24    ports_inn: None,
25    ports_out: None,
26    input_delaytype_fn: |_| None,
27    write_fn: |wc @ &WriteContextArgs {
28                   context,
29                   df_ident,
30                   op_span,
31                   arguments,
32                   ident,
33                   is_pull,
34                   inputs,
35                   singleton_output_ident,
36                   ..
37               },
38               _diagnostics| {
39        assert!(is_pull);
40
41        let write_prologue = quote_spanned! {op_span=>
42            #[allow(clippy::redundant_closure_call)]
43            let #singleton_output_ident = #df_ident.add_state(
44                ::std::cell::RefCell::new(::std::vec::Vec::new())
45            );
46        };
47
48        let vec_ident = wc.make_ident("vec");
49
50        let input = &inputs[0];
51        let write_iterator = quote_spanned! {op_span=>
52            let mut #vec_ident = unsafe {
53                // SAFETY: handle from `#df_ident.add_state(..)`.
54                #context.state_ref_unchecked(#singleton_output_ident)
55            }.borrow_mut();
56
57            if 0 == #context.loop_iter_count() {
58                *#vec_ident = #input.collect::<::std::vec::Vec<_>>();
59            }
60            let #ident = std::iter::IntoIterator::into_iter(::std::clone::Clone::clone(&*#vec_ident));
61        };
62
63        // Reschedule, to repeat.
64        let count_arg = &arguments[0];
65        let write_iterator_after = quote_spanned! {op_span=>
66            {
67                if #context.loop_iter_count() + 1 < #count_arg {
68                    #context.reschedule_loop_block();
69                }
70            }
71        };
72
73        Ok(OperatorWriteOutput {
74            write_prologue,
75            write_iterator,
76            write_iterator_after,
77            ..Default::default()
78        })
79    },
80};