1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
use syn::{parse_quote, parse_quote_spanned};
use super::{
OpInstGenerics, OperatorCategory, OperatorConstraints, OperatorInstance,
WriteContextArgs, RANGE_0, RANGE_1,
};
/// > 2 input streams of type <(K, V1)> and <(K, V2)>, 1 output stream of type <(K, (V1, V2))>
///
/// This operator is equivalent to `join` except that the LHS and RHS are collected into multisets rather than sets before joining.
///
/// If you want
/// duplicates eliminated from the inputs, use the [`join`](#join) operator.
///
/// For example:
/// ```dfir
/// lhs = source_iter([("a", 0), ("a", 0)]) -> tee();
/// rhs = source_iter([("a", "hydro")]) -> tee();
///
/// lhs -> [0]multiset_join;
/// rhs -> [1]multiset_join;
/// multiset_join = join_multiset() -> assert_eq([("a", (0, "hydro")), ("a", (0, "hydro"))]);
///
/// lhs -> [0]set_join;
/// rhs -> [1]set_join;
/// set_join = join() -> assert_eq([("a", (0, "hydro"))]);
/// ```
pub const JOIN_MULTISET: OperatorConstraints = OperatorConstraints {
name: "join_multiset",
categories: &[OperatorCategory::MultiIn],
hard_range_inn: &(2..=2),
soft_range_inn: &(2..=2),
hard_range_out: RANGE_1,
soft_range_out: RANGE_1,
num_args: 0,
persistence_args: &(0..=2),
type_args: RANGE_0,
is_external_input: false,
has_singleton_output: false,
flo_type: None,
ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })),
ports_out: None,
input_delaytype_fn: |_| None,
write_fn: |wc @ &WriteContextArgs {
root,
op_span,
op_inst: op_inst @ OperatorInstance { .. },
..
},
diagnostics| {
let join_type = parse_quote_spanned! {op_span=> // Uses `lat_type.span()`!
#root::compiled::pull::HalfMultisetJoinState
};
let wc = WriteContextArgs {
op_inst: &OperatorInstance {
generics: OpInstGenerics {
type_args: vec![join_type],
..wc.op_inst.generics.clone()
},
..op_inst.clone()
},
..wc.clone()
};
(super::join::JOIN.write_fn)(&wc, diagnostics)
},
};