dfir_lang/graph/ops/
enumerate.rs

1use quote::quote_spanned;
2
3use super::{
4    OperatorCategory, OperatorConstraints, OperatorWriteOutput, RANGE_0, RANGE_1, WriteContextArgs,
5};
6
7/// > 1 input stream of type `T`, 1 output stream of type `(usize, T)`
8///
9/// For each item passed in, enumerate it with its index: `(0, x_0)`, `(1, x_1)`, etc.
10///
11/// `enumerate` can also be provided with one generic lifetime persistence argument, either
12/// `'tick` or `'static`, to specify if indexing resets. If `'tick` (the default) is specified, indexing will
13/// restart at zero at the start of each tick. Otherwise `'static` will never reset
14/// and count monotonically upwards.
15///
16/// ```dfir
17/// source_iter(vec!["hello", "world"])
18///     -> enumerate()
19///     -> assert_eq([(0, "hello"), (1, "world")]);
20/// ```
21pub const ENUMERATE: OperatorConstraints = OperatorConstraints {
22    name: "enumerate",
23    categories: &[OperatorCategory::Map],
24    hard_range_inn: RANGE_1,
25    soft_range_inn: RANGE_1,
26    hard_range_out: RANGE_1,
27    soft_range_out: RANGE_1,
28    num_args: 0,
29    persistence_args: &(0..=1),
30    type_args: RANGE_0,
31    is_external_input: false,
32    has_singleton_output: false,
33    flo_type: None,
34    ports_inn: None,
35    ports_out: None,
36    input_delaytype_fn: |_| None,
37    write_fn: |wc @ &WriteContextArgs {
38                   root,
39                   op_span,
40                   context,
41                   df_ident,
42                   ident,
43                   inputs,
44                   outputs,
45                   is_pull,
46                   ..
47               },
48               diagnostics| {
49        let [persistence] = wc.persistence_args_disallow_mutable(diagnostics);
50
51        let input = &inputs[0];
52        let output = &outputs[0];
53
54        let counter_ident = wc.make_ident("counterdata");
55
56        let write_prologue = quote_spanned! {op_span=>
57            let #counter_ident = #df_ident.add_state(::std::cell::RefCell::new(0..));
58        };
59        let write_prologue_after = wc
60            .persistence_as_state_lifespan(persistence)
61            .map(|lifespan| quote_spanned! {op_span=>
62                #df_ident.set_state_lifespan_hook(#counter_ident, #lifespan, |rcell| { rcell.replace(0..); });
63            }).unwrap_or_default();
64
65        let map_fn = quote_spanned! {op_span=>
66            |item| {
67                let mut counter = unsafe {
68                    // SAFETY: handle from `#df_ident.add_state(..)`.
69                    #context.state_ref_unchecked(#counter_ident)
70                }.borrow_mut();
71                (counter.next().unwrap(), item)
72            }
73        };
74        let write_iterator = if is_pull {
75            quote_spanned! {op_span=>
76                let #ident = ::std::iter::Iterator::map(#input, #map_fn);
77            }
78        } else {
79            quote_spanned! {op_span=>
80                let #ident = #root::pusherator::map::Map::new(#map_fn, #output);
81            }
82        };
83
84        Ok(OperatorWriteOutput {
85            write_prologue,
86            write_prologue_after,
87            write_iterator,
88            ..Default::default()
89        })
90    },
91};