dfir_lang/graph/ops/
batch.rs

1use quote::quote_spanned;
2
3use super::{
4    identity_write_iterator_fn, FloType, OperatorCategory, OperatorConstraints,
5    OperatorWriteOutput, WriteContextArgs, RANGE_0, RANGE_1,
6};
7
8/// Given an _unbounded_ input stream, emits values arbitrarily split into batches over multiple iterations in the same order.
9///
10/// Will cause additional loop iterations as long as new values arrive.
11pub const BATCH: OperatorConstraints = OperatorConstraints {
12    name: "batch",
13    categories: &[OperatorCategory::Windowing],
14    hard_range_inn: RANGE_1,
15    soft_range_inn: RANGE_1,
16    hard_range_out: RANGE_1,
17    soft_range_out: RANGE_1,
18    num_args: 0,
19    persistence_args: RANGE_0,
20    type_args: RANGE_0,
21    is_external_input: false,
22    has_singleton_output: true,
23    flo_type: Some(FloType::Windowing),
24    ports_inn: None,
25    ports_out: None,
26    input_delaytype_fn: |_| None,
27    // Scheduler automatically handles the batching of values as this is a `OperatorCategory::Windowing` operator.
28    write_fn: |wc @ &WriteContextArgs {
29                   context, op_span, ..
30               },
31               _diagnostics| {
32        let write_iterator = identity_write_iterator_fn(wc);
33        let write_iterator_after = quote_spanned! {op_span=>
34            #context.allow_another_iteration();
35        };
36        Ok(OperatorWriteOutput {
37            write_iterator,
38            write_iterator_after,
39            ..Default::default()
40        })
41    },
42};