use quote::{quote_spanned, ToTokens};
use syn::parse_quote;
use syn::spanned::Spanned;
use super::join_fused::{make_joindata, parse_argument, parse_persistences, JoinOptions};
use super::{
DelayType, OpInstGenerics, OperatorCategory, OperatorConstraints, OperatorInstance,
OperatorWriteOutput, Persistence, PortIndexValue, WriteContextArgs, RANGE_0, RANGE_1,
};
use crate::diagnostic::{Diagnostic, Level};
pub const JOIN_FUSED_LHS: OperatorConstraints = OperatorConstraints {
name: "join_fused_lhs",
categories: &[OperatorCategory::MultiIn],
hard_range_inn: &(2..=2),
soft_range_inn: &(2..=2),
hard_range_out: RANGE_1,
soft_range_out: RANGE_1,
num_args: 1,
persistence_args: &(0..=2),
type_args: RANGE_0,
is_external_input: false,
has_singleton_output: false,
flo_type: None,
ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })),
ports_out: None,
input_delaytype_fn: |idx| match idx {
PortIndexValue::Int(path) if "0" == path.to_token_stream().to_string() => {
Some(DelayType::Stratum)
}
_ => None,
},
write_fn: |wc @ &WriteContextArgs {
context,
hydroflow,
op_span,
ident,
inputs,
is_pull,
op_inst:
OperatorInstance {
generics:
OpInstGenerics {
persistence_args, ..
},
..
},
arguments,
..
},
diagnostics| {
assert!(is_pull);
let arg0_span = arguments[0].span();
let persistences = parse_persistences(persistence_args);
let lhs_join_options =
parse_argument(&arguments[0]).map_err(|err| diagnostics.push(err))?;
let (lhs_joindata_ident, lhs_borrow_ident, lhs_prologue, lhs_borrow) =
make_joindata(wc, persistences[0], &lhs_join_options, "lhs")
.map_err(|err| diagnostics.push(err))?;
let rhs_joindata_ident = wc.make_ident("rhs_joindata");
let rhs_borrow_ident = wc.make_ident("rhs_joindata_borrow_ident");
let write_prologue_rhs = match persistences[1] {
Persistence::Tick => quote_spanned! {op_span=>},
Persistence::Static => quote_spanned! {op_span=>
let #rhs_joindata_ident = #hydroflow.add_state(::std::cell::RefCell::new(
::std::vec::Vec::new()
));
},
Persistence::Mutable => {
diagnostics.push(Diagnostic::spanned(
op_span,
Level::Error,
"An implementation of 'mutable does not exist",
));
return Err(());
}
};
let write_prologue = quote_spanned! {op_span=>
#lhs_prologue
#write_prologue_rhs
};
let lhs = &inputs[0];
let rhs = &inputs[1];
let lhs_fold_or_reduce_into_from = match lhs_join_options {
JoinOptions::FoldFrom(lhs_from, lhs_fold) => quote_spanned! {arg0_span=>
#lhs_borrow.fold_into(#lhs, #lhs_fold, #lhs_from);
},
JoinOptions::Fold(lhs_default, lhs_fold) => quote_spanned! {arg0_span=>
#lhs_borrow.fold_into(#lhs, #lhs_fold, #lhs_default);
},
JoinOptions::Reduce(lhs_reduce) => quote_spanned! {arg0_span=>
#lhs_borrow.reduce_into(#lhs, #lhs_reduce);
},
};
let write_iterator = match persistences[1] {
Persistence::Tick => quote_spanned! {op_span=>
let mut #lhs_borrow_ident = #context.state_ref(#lhs_joindata_ident).borrow_mut();
let #ident = {
#lhs_fold_or_reduce_into_from
#[allow(clippy::clone_on_copy)]
#rhs.filter_map(|(k, v2)| #lhs_borrow.table.get(&k).map(|v1| (k, (v1.clone(), v2.clone()))))
};
},
Persistence::Static => quote_spanned! {op_span=>
let mut #lhs_borrow_ident = #context.state_ref(#lhs_joindata_ident).borrow_mut();
let mut #rhs_borrow_ident = #context.state_ref(#rhs_joindata_ident).borrow_mut();
let #ident = {
#lhs_fold_or_reduce_into_from
#[allow(clippy::clone_on_copy)]
#[allow(suspicious_double_ref_op)]
if #context.is_first_run_this_tick() {
#rhs_borrow_ident.extend(#rhs);
#rhs_borrow_ident.iter()
} else {
let len = #rhs_borrow_ident.len();
#rhs_borrow_ident.extend(#rhs);
#rhs_borrow_ident[len..].iter()
}
.filter_map(|(k, v2)| #lhs_borrow.table.get(k).map(|v1| (k.clone(), (v1.clone(), v2.clone()))))
};
},
Persistence::Mutable => {
diagnostics.push(Diagnostic::spanned(
op_span,
Level::Error,
"An implementation of 'mutable does not exist",
));
return Err(());
}
};
let write_iterator_after =
if persistences[0] == Persistence::Static || persistences[1] == Persistence::Static {
quote_spanned! {op_span=>
#context.schedule_subgraph(#context.current_subgraph(), false);
}
} else {
quote_spanned! {op_span=>}
};
Ok(OperatorWriteOutput {
write_prologue,
write_iterator,
write_iterator_after,
})
},
};