1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
use syn::{parse_quote, parse_quote_spanned};

use super::{
    OpInstGenerics, OperatorCategory, OperatorConstraints, OperatorInstance,
    WriteContextArgs, RANGE_0, RANGE_1,
};

/// > 2 input streams of type <(K, V1)> and <(K, V2)>, 1 output stream of type <(K, (V1, V2))>
///
/// This operator is equivalent to `join` except that the LHS and RHS are collected into multisets rather than sets before joining.
///
/// If you want
/// duplicates eliminated from the inputs, use the [`join`](#join) operator.
///
/// For example:
/// ```dfir
/// lhs = source_iter([("a", 0), ("a", 0)]) -> tee();
/// rhs = source_iter([("a", "hydro")]) -> tee();
///
/// lhs -> [0]multiset_join;
/// rhs -> [1]multiset_join;
/// multiset_join = join_multiset() -> assert_eq([("a", (0, "hydro")), ("a", (0, "hydro"))]);
///
/// lhs -> [0]set_join;
/// rhs -> [1]set_join;
/// set_join = join() -> assert_eq([("a", (0, "hydro"))]);
/// ```
pub const JOIN_MULTISET: OperatorConstraints = OperatorConstraints {
    name: "join_multiset",
    categories: &[OperatorCategory::MultiIn],
    hard_range_inn: &(2..=2),
    soft_range_inn: &(2..=2),
    hard_range_out: RANGE_1,
    soft_range_out: RANGE_1,
    num_args: 0,
    persistence_args: &(0..=2),
    type_args: RANGE_0,
    is_external_input: false,
    has_singleton_output: false,
    flo_type: None,
    ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })),
    ports_out: None,
    input_delaytype_fn: |_| None,
    write_fn: |wc @ &WriteContextArgs {
                   root,
                   op_span,
                   op_inst: op_inst @ OperatorInstance { .. },
                   ..
               },
               diagnostics| {
        let join_type = parse_quote_spanned! {op_span=> // Uses `lat_type.span()`!
            #root::compiled::pull::HalfMultisetJoinState
        };

        let wc = WriteContextArgs {
            op_inst: &OperatorInstance {
                generics: OpInstGenerics {
                    type_args: vec![join_type],
                    ..wc.op_inst.generics.clone()
                },
                ..op_inst.clone()
            },
            ..wc.clone()
        };

        (super::join::JOIN.write_fn)(&wc, diagnostics)
    },
};