1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
use quote::quote_spanned;
use super::{
DelayType, OpInstGenerics, OperatorCategory, OperatorConstraints, OperatorInstance,
OperatorWriteOutput, Persistence, WriteContextArgs, RANGE_0, RANGE_1,
};
use crate::diagnostic::{Diagnostic, Level};
/// `persist_mut_keyed()` is similar to `persist_mut()` except that it also enables key-based deletions
/// `persist_mut()` expects an input of type [`PersistenceKeyed<T>`](https://docs.rs/hydroflow/latest/hydroflow/util/enum.PersistenceKeyed.html),
/// and it is this enumeration that enables the user to communicate deletion.
/// Deletions/persists happen in the order they are received in the stream.
/// For example, `[Persist(1), Delete(1), Persist(1)]` will result in a a single `1` value being stored.
///
/// ```dfir
/// use dfir_rs::util::PersistenceKeyed;
///
/// source_iter([
/// PersistenceKeyed::Persist(0, 1),
/// PersistenceKeyed::Persist(1, 1),
/// PersistenceKeyed::Delete(1),
/// ])
/// -> persist_mut_keyed::<'static>()
/// -> assert_eq([(0, 1)]);
/// ```
pub const PERSIST_MUT_KEYED: OperatorConstraints = OperatorConstraints {
name: "persist_mut_keyed",
categories: &[OperatorCategory::Persistence],
hard_range_inn: RANGE_1,
soft_range_inn: RANGE_1,
hard_range_out: RANGE_1,
soft_range_out: RANGE_1,
num_args: 0,
persistence_args: RANGE_1,
type_args: RANGE_0,
is_external_input: false,
// If this is set to true, the state will need to be cleared using `#context.set_state_tick_hook`
// to prevent reading uncleared data if this subgraph doesn't run.
// https://github.com/hydro-project/hydro/issues/1298
// If `'tick` lifetimes are added.
has_singleton_output: false,
flo_type: None,
ports_inn: None,
ports_out: None,
input_delaytype_fn: |_| Some(DelayType::Stratum),
write_fn: |wc @ &WriteContextArgs {
root,
context,
hydroflow,
op_span,
ident,
inputs,
is_pull,
op_name,
op_inst:
OperatorInstance {
generics:
OpInstGenerics {
persistence_args, ..
},
..
},
..
},
diagnostics| {
assert!(is_pull);
if [Persistence::Static] != persistence_args[..] {
diagnostics.push(Diagnostic::spanned(
op_span,
Level::Error,
format!("{} only supports `'static`.", op_name),
));
}
let persistdata_ident = wc.make_ident("persistdata");
let vec_ident = wc.make_ident("persistvec");
let write_prologue = quote_spanned! {op_span=>
let #persistdata_ident = #hydroflow.add_state(::std::cell::RefCell::new(
#root::rustc_hash::FxHashMap::<_, #root::util::sparse_vec::SparseVec<_>>::default()
));
};
let write_iterator = {
let input = &inputs[0];
quote_spanned! {op_span=>
let mut #vec_ident = #context.state_ref(#persistdata_ident).borrow_mut();
let #ident = {
#[inline(always)]
fn check_iter<K, V>(iter: impl Iterator<Item = #root::util::PersistenceKeyed::<K, V>>) -> impl Iterator<Item = #root::util::PersistenceKeyed::<K, V>> {
iter
}
if context.is_first_run_this_tick() {
for item in check_iter(#input) {
match item {
#root::util::PersistenceKeyed::Persist(k, v) => {
#vec_ident.entry(k).or_default().push(v);
},
#root::util::PersistenceKeyed::Delete(k) => {
#vec_ident.remove(&k);
}
}
}
#[allow(clippy::clone_on_copy)]
Some(#vec_ident.iter()
.flat_map(|(k, v)| v.iter().map(move |v| (k.clone(), v.clone()))))
.into_iter()
.flatten()
} else {
None.into_iter().flatten()
}
};
}
};
let write_iterator_after = quote_spanned! {op_span=>
#context.schedule_subgraph(#context.current_subgraph(), false);
};
Ok(OperatorWriteOutput {
write_prologue,
write_iterator,
write_iterator_after,
})
},
};