dfir_lang/graph/ops/
join_multiset.rs

1use syn::{parse_quote, parse_quote_spanned};
2
3use super::{
4    OpInstGenerics, OperatorCategory, OperatorConstraints, OperatorInstance,
5    WriteContextArgs, RANGE_0, RANGE_1,
6};
7
8/// > 2 input streams of type `<(K, V1)>` and `<(K, V2)>`, 1 output stream of type `<(K, (V1, V2))>`
9///
10/// This operator is equivalent to `join` except that the LHS and RHS are collected into multisets rather than sets before joining.
11///
12/// If you want
13/// duplicates eliminated from the inputs, use the [`join`](#join) operator.
14///
15/// For example:
16/// ```dfir
17/// lhs = source_iter([("a", 0), ("a", 0)]) -> tee();
18/// rhs = source_iter([("a", "hydro")]) -> tee();
19///
20/// lhs -> [0]multiset_join;
21/// rhs -> [1]multiset_join;
22/// multiset_join = join_multiset() -> assert_eq([("a", (0, "hydro")), ("a", (0, "hydro"))]);
23///
24/// lhs -> [0]set_join;
25/// rhs -> [1]set_join;
26/// set_join = join() -> assert_eq([("a", (0, "hydro"))]);
27/// ```
28pub const JOIN_MULTISET: OperatorConstraints = OperatorConstraints {
29    name: "join_multiset",
30    categories: &[OperatorCategory::MultiIn],
31    hard_range_inn: &(2..=2),
32    soft_range_inn: &(2..=2),
33    hard_range_out: RANGE_1,
34    soft_range_out: RANGE_1,
35    num_args: 0,
36    persistence_args: &(0..=2),
37    type_args: RANGE_0,
38    is_external_input: false,
39    has_singleton_output: false,
40    flo_type: None,
41    ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })),
42    ports_out: None,
43    input_delaytype_fn: |_| None,
44    write_fn: |wc @ &WriteContextArgs {
45                   root,
46                   op_span,
47                   op_inst: op_inst @ OperatorInstance { .. },
48                   ..
49               },
50               diagnostics| {
51        let join_type = parse_quote_spanned! {op_span=> // Uses `lat_type.span()`!
52            #root::compiled::pull::HalfMultisetJoinState
53        };
54
55        let wc = WriteContextArgs {
56            op_inst: &OperatorInstance {
57                generics: OpInstGenerics {
58                    type_args: vec![join_type],
59                    ..wc.op_inst.generics.clone()
60                },
61                ..op_inst.clone()
62            },
63            ..wc.clone()
64        };
65
66        (super::join::JOIN.write_fn)(&wc, diagnostics)
67    },
68};