dfir_lang/graph/ops/
defer_signal.rs

1use quote::quote_spanned;
2use syn::parse_quote;
3
4use super::{
5    DelayType, OperatorCategory, OperatorConstraints, OperatorWriteOutput,
6    WriteContextArgs, RANGE_0, RANGE_1,
7};
8
9/// > 2 input streams, 1 output stream, no arguments.
10///
11/// Defers streaming input and releases it downstream when a signal is delivered. The order of input is preserved. This allows for buffering data and delivering it at a later, chosen, tick.
12///
13/// There are two inputs to `defer_signal`, they are `input` and `signal`.
14/// `input` is the input data flow. Data that is delivered on this input is collected in order inside of the `defer_signal` operator.
15/// When anything is sent to `signal` the collected data is released downstream. The entire `signal` input is consumed each tick, so sending 5 things on `signal` will not release inputs on the next 5 consecutive ticks.
16///
17/// ```dfir
18/// gate = defer_signal();
19///
20/// source_iter([1, 2, 3]) -> [input]gate;
21/// source_iter([()]) -> [signal]gate;
22///
23/// gate -> assert_eq([1, 2, 3]);
24/// ```
25pub const DEFER_SIGNAL: OperatorConstraints = OperatorConstraints {
26    name: "defer_signal",
27    categories: &[OperatorCategory::Persistence],
28    persistence_args: RANGE_0,
29    type_args: RANGE_0,
30    hard_range_inn: &(2..=2),
31    soft_range_inn: &(2..=2),
32    hard_range_out: RANGE_1,
33    soft_range_out: RANGE_1,
34    num_args: 0,
35    is_external_input: false,
36    has_singleton_output: false,
37    flo_type: None,
38    ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { input, signal })),
39    ports_out: None,
40    input_delaytype_fn: |_| Some(DelayType::Stratum),
41    write_fn: |wc @ &WriteContextArgs {
42                   context,
43                   df_ident,
44                   ident,
45                   op_span,
46                   inputs,
47                   is_pull,
48                   ..
49               },
50               _| {
51        assert!(is_pull);
52
53        let internal_buffer = wc.make_ident("internal_buffer");
54        let borrow_ident = wc.make_ident("borrow_ident");
55
56        let write_prologue = quote_spanned! {op_span=>
57            let #internal_buffer = #df_ident.add_state(::std::cell::RefCell::new(::std::vec::Vec::new()));
58        };
59
60        let input = &inputs[0];
61        let signal = &inputs[1];
62
63        let write_iterator = {
64            quote_spanned! {op_span=>
65
66                let mut #borrow_ident = unsafe {
67                    // SAFETY: handle from `#df_ident.add_state(..)`.
68                    #context.state_ref_unchecked(#internal_buffer)
69                }.borrow_mut();
70
71                #borrow_ident.extend(#input);
72
73                let #ident = if #signal.count() > 0 {
74                    ::std::option::Option::Some(#borrow_ident.drain(..))
75                } else {
76                    ::std::option::Option::None
77                }.into_iter().flatten();
78            }
79        };
80
81        Ok(OperatorWriteOutput {
82            write_prologue,
83            write_iterator,
84            write_iterator_after: Default::default(),
85            ..Default::default()
86        })
87    },
88};