dfir_lang/graph/ops/
union.rs

1use quote::{quote_spanned, ToTokens};
2
3use super::{
4    OperatorCategory, OperatorConstraints, OperatorWriteOutput,
5    WriteContextArgs, RANGE_0, RANGE_1, RANGE_ANY,
6};
7
8/// > *n* input streams of the same type, 1 output stream of the same type
9///
10/// Unions an arbitrary number of input streams into a single stream. Each input sequence is a subsequence of the output, but no guarantee is given on how the inputs are interleaved.
11///
12/// Since `union` has multiple input streams, it needs to be assigned to
13/// a variable to reference its multiple input ports across statements.
14///
15/// ```dfir
16/// source_iter(vec!["hello", "world"]) -> my_union;
17/// source_iter(vec!["stay", "gold"]) -> my_union;
18/// source_iter(vec!["don't", "give", "up"]) -> my_union;
19/// my_union = union()
20///     -> map(|x| x.to_uppercase())
21///     -> assert_eq(["HELLO", "WORLD", "STAY", "GOLD", "DON'T", "GIVE", "UP"]);
22/// ```
23pub const UNION: OperatorConstraints = OperatorConstraints {
24    name: "union",
25    categories: &[OperatorCategory::MultiIn],
26    hard_range_inn: RANGE_ANY,
27    soft_range_inn: &(2..),
28    hard_range_out: RANGE_1,
29    soft_range_out: RANGE_1,
30    num_args: 0,
31    persistence_args: RANGE_0,
32    type_args: RANGE_0,
33    is_external_input: false,
34    has_singleton_output: false,
35    flo_type: None,
36    ports_inn: None,
37    ports_out: None,
38    input_delaytype_fn: |_| None,
39    write_fn: |&WriteContextArgs {
40                   op_span,
41                   ident,
42                   inputs,
43                   outputs,
44                   is_pull,
45                   ..
46               },
47               _| {
48        let write_iterator = if is_pull {
49            let chains = inputs
50                .iter()
51                .map(|i| i.to_token_stream())
52                .reduce(|a, b| quote_spanned! {op_span=> check_inputs(#a, #b) })
53                .unwrap_or_else(|| quote_spanned! {op_span=> std::iter::empty() });
54            quote_spanned! {op_span=>
55                let #ident = {
56                    #[allow(unused)]
57                    #[inline(always)]
58                    fn check_inputs<A: ::std::iter::Iterator<Item = Item>, B: ::std::iter::Iterator<Item = Item>, Item>(a: A, b: B) -> impl ::std::iter::Iterator<Item = Item> {
59                        a.chain(b)
60                    }
61                    #chains
62                };
63            }
64        } else {
65            assert_eq!(1, outputs.len());
66            let output = &outputs[0];
67            quote_spanned! {op_span=>
68                let #ident = #output;
69            }
70        };
71        Ok(OperatorWriteOutput {
72            write_iterator,
73            ..Default::default()
74        })
75    },
76};