dfir_lang/graph/ops/
_counter.rs
1use quote::quote_spanned;
2
3use super::{
4 OperatorCategory, OperatorConstraints, OperatorWriteOutput, WriteContextArgs, RANGE_0, RANGE_1,
5};
6
7pub const _COUNTER: OperatorConstraints = OperatorConstraints {
35 name: "_counter",
36 categories: &[OperatorCategory::Map],
37 hard_range_inn: RANGE_1,
38 soft_range_inn: RANGE_1,
39 hard_range_out: &(0..=1),
40 soft_range_out: &(0..=1),
41 num_args: 2,
42 persistence_args: RANGE_0,
43 type_args: RANGE_0,
44 is_external_input: false,
45 has_singleton_output: false,
46 flo_type: None,
47 ports_inn: None,
48 ports_out: None,
49 input_delaytype_fn: |_| None,
50 write_fn: |wc @ &WriteContextArgs {
51 root,
52 df_ident,
53 op_span,
54 ident,
55 inputs,
56 outputs,
57 is_pull,
58 arguments,
59 ..
60 },
61 _| {
62 let read_ident = wc.make_ident("read");
63 let write_ident = wc.make_ident("write");
64
65 let tag_expr = &arguments[0];
66 let tag_ident = wc.make_ident("tag");
67 let duration_expr = &arguments[1];
68 let duration_ident = wc.make_ident("duration");
69
70 let write_prologue = quote_spanned! {op_span=>
71 let #write_ident = ::std::rc::Rc::new(::std::cell::Cell::new(0));
72
73 let #read_ident = ::std::rc::Rc::clone(&#write_ident);
74 let #duration_ident = #duration_expr;
75 let #tag_ident = #tag_expr;
76 #df_ident.request_task(async move {
77 loop {
78 println!("_counter({}): {}", #tag_ident, #read_ident.get());
79 #root::tokio::time::sleep(#duration_ident).await;
80 }
81 });
82 };
83
84 let count_ident = wc.make_ident("count");
85 let write_iterator = if is_pull {
86 let input = &inputs[0];
87 quote_spanned! {op_span=>
88 let #ident = #input.inspect(|_| { #count_ident += 1; });
89 }
90 } else if outputs.is_empty() {
91 quote_spanned! {op_span=>
92 let #ident = #root::pusherator::inspect::Inspect::new(|_| { #count_ident += 1; }, #root::pusherator::null::Null::new());
93 }
94 } else {
95 let output = &outputs[0];
96 quote_spanned! {op_span=>
97 let #ident = #root::pusherator::inspect::Inspect::new(|_| { #count_ident += 1; }, #output);
98 }
99 };
100 let write_iterator = quote_spanned! {op_span=>
101 let mut #count_ident = 0;
102 #write_iterator
103 };
104
105 let write_iterator_after = quote_spanned! {op_span=>
106 #write_ident.set(#write_ident.get() + #count_ident);
107 };
108
109 Ok(OperatorWriteOutput {
110 write_prologue,
111 write_iterator,
112 write_iterator_after,
113 })
114 },
115};