dfir_lang/graph/ops/
_counter.rs

1use quote::quote_spanned;
2
3use super::{
4    OperatorCategory, OperatorConstraints, OperatorWriteOutput, WriteContextArgs, RANGE_0, RANGE_1,
5};
6
7/// > Arguments: A `tag` string and a `Duration` for how long to wait between printing.
8///
9/// Counts the number of items passing through and prints to stdout whenever the stream trigger activates.
10///
11/// ```dfir
12/// source_stream(dfir_rs::util::iter_batches_stream(0..=100_000, 1))
13///     -> _counter("nums", Duration::from_millis(100));
14/// ```
15/// stdout:
16/// ```text
17/// _counter(nums): 1
18/// _counter(nums): 6202
19/// _counter(nums): 12540
20/// _counter(nums): 18876
21/// _counter(nums): 25218
22/// _counter(nums): 31557
23/// _counter(nums): 37893
24/// _counter(nums): 44220
25/// _counter(nums): 50576
26/// _counter(nums): 56909
27/// _counter(nums): 63181
28/// _counter(nums): 69549
29/// _counter(nums): 75914
30/// _counter(nums): 82263
31/// _counter(nums): 88638
32/// _counter(nums): 94980
33/// ```
34pub const _COUNTER: OperatorConstraints = OperatorConstraints {
35    name: "_counter",
36    categories: &[OperatorCategory::Map],
37    hard_range_inn: RANGE_1,
38    soft_range_inn: RANGE_1,
39    hard_range_out: &(0..=1),
40    soft_range_out: &(0..=1),
41    num_args: 2,
42    persistence_args: RANGE_0,
43    type_args: RANGE_0,
44    is_external_input: false,
45    has_singleton_output: false,
46    flo_type: None,
47    ports_inn: None,
48    ports_out: None,
49    input_delaytype_fn: |_| None,
50    write_fn: |wc @ &WriteContextArgs {
51                   root,
52                   df_ident,
53                   op_span,
54                   ident,
55                   inputs,
56                   outputs,
57                   is_pull,
58                   arguments,
59                   ..
60               },
61               _| {
62        let read_ident = wc.make_ident("read");
63        let write_ident = wc.make_ident("write");
64
65        let tag_expr = &arguments[0];
66        let tag_ident = wc.make_ident("tag");
67        let duration_expr = &arguments[1];
68        let duration_ident = wc.make_ident("duration");
69
70        let write_prologue = quote_spanned! {op_span=>
71            let #write_ident = ::std::rc::Rc::new(::std::cell::Cell::new(0));
72
73            let #read_ident = ::std::rc::Rc::clone(&#write_ident);
74            let #duration_ident = #duration_expr;
75            let #tag_ident = #tag_expr;
76            #df_ident.request_task(async move {
77                loop {
78                    println!("_counter({}): {}", #tag_ident, #read_ident.get());
79                    #root::tokio::time::sleep(#duration_ident).await;
80                }
81            });
82        };
83
84        let count_ident = wc.make_ident("count");
85        let write_iterator = if is_pull {
86            let input = &inputs[0];
87            quote_spanned! {op_span=>
88                let #ident = #input.inspect(|_| { #count_ident += 1; });
89            }
90        } else if outputs.is_empty() {
91            quote_spanned! {op_span=>
92                let #ident = #root::pusherator::inspect::Inspect::new(|_| { #count_ident += 1; }, #root::pusherator::null::Null::new());
93            }
94        } else {
95            let output = &outputs[0];
96            quote_spanned! {op_span=>
97                let #ident = #root::pusherator::inspect::Inspect::new(|_| { #count_ident += 1; }, #output);
98            }
99        };
100        let write_iterator = quote_spanned! {op_span=>
101            let mut #count_ident = 0;
102            #write_iterator
103        };
104
105        let write_iterator_after = quote_spanned! {op_span=>
106            #write_ident.set(#write_ident.get() + #count_ident);
107        };
108
109        Ok(OperatorWriteOutput {
110            write_prologue,
111            write_iterator,
112            write_iterator_after,
113        })
114    },
115};