dfir_lang/graph/ops/
cross_singleton.rs
1use quote::{ToTokens, quote_spanned};
2use syn::parse_quote;
3
4use super::{
5 DelayType, OperatorCategory, OperatorConstraints, OperatorWriteOutput, RANGE_0, RANGE_1,
6 WriteContextArgs,
7};
8use crate::graph::PortIndexValue;
9
10pub const CROSS_SINGLETON: OperatorConstraints = OperatorConstraints {
31 name: "cross_singleton",
32 categories: &[OperatorCategory::MultiIn],
33 persistence_args: RANGE_0,
34 type_args: RANGE_0,
35 hard_range_inn: &(2..=2),
36 soft_range_inn: &(2..=2),
37 hard_range_out: RANGE_1,
38 soft_range_out: RANGE_1,
39 num_args: 0,
40 is_external_input: false,
41 has_singleton_output: false,
42 flo_type: None,
43 ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { input, single })),
44 ports_out: None,
45 input_delaytype_fn: |idx| match idx {
46 PortIndexValue::Path(path) if "single" == path.to_token_stream().to_string() => {
47 Some(DelayType::Stratum)
48 }
49 _else => None,
50 },
51 write_fn: |wc @ &WriteContextArgs {
52 root,
53 context,
54 df_ident,
55 ident,
56 op_span,
57 inputs,
58 is_pull,
59 work_fn,
60 ..
61 },
62 _diagnostics| {
63 assert!(is_pull);
64
65 let stream_input = &inputs[0];
66 let singleton_input = &inputs[1];
67 let singleton_handle_ident = wc.make_ident("singleton_handle");
68
69 let write_prologue = quote_spanned! {op_span=>
70 let #singleton_handle_ident = #df_ident.add_state(
71 ::std::cell::RefCell::new(::std::option::Option::None)
72 );
73 #df_ident.set_state_lifespan_hook(#singleton_handle_ident, #root::scheduled::graph::StateLifespan::Tick, |rcell| { rcell.take(); });
75 };
76
77 let write_iterator = quote_spanned! {op_span=>
78 let #ident = {
79 #[inline(always)]
80 fn cross_singleton_guard<Singleton, Item, SingletonIter, Stream>(
81 mut singleton_state_mut: std::cell::RefMut<'_, Option<Singleton>>,
82 mut singleton_input: SingletonIter,
83 stream_input: Stream,
84 ) -> impl use<Item, Singleton, Stream, SingletonIter> + Iterator<Item = (Item, Singleton)>
85 where
86 Singleton: ::std::clone::Clone,
87 SingletonIter: Iterator<Item = Singleton>,
88 Stream: Iterator<Item = Item>,
89 {
90 let singleton_value_opt = #work_fn(|| match &*singleton_state_mut {
91 ::std::option::Option::Some(singleton_value) => Some(singleton_value.clone()),
92 ::std::option::Option::None => {
93 let singleton_value_opt = singleton_input.next();
94 *singleton_state_mut = singleton_value_opt.clone();
95 singleton_value_opt
96 }
97 });
98 singleton_value_opt
99 .map(|singleton_value| {
100 stream_input.map(move |item| (item, ::std::clone::Clone::clone(&singleton_value)))
101 })
102 .into_iter()
103 .flatten()
104 }
105 cross_singleton_guard(
106 unsafe {
107 #context.state_ref_unchecked(#singleton_handle_ident)
109 }.borrow_mut(),
110 #singleton_input,
111 #stream_input,
112 )
113 };
114 };
115
116 Ok(OperatorWriteOutput {
117 write_prologue,
118 write_iterator,
119 ..Default::default()
120 })
121 },
122};