dfir_lang/graph/ops/
source_stream_serde.rs
1use quote::quote_spanned;
2
3use super::{
4 FloType, OperatorCategory, OperatorConstraints, OperatorWriteOutput, WriteContextArgs, RANGE_0,
5 RANGE_1,
6};
7
8pub const SOURCE_STREAM_SERDE: OperatorConstraints = OperatorConstraints {
28 name: "source_stream_serde",
29 categories: &[OperatorCategory::Source],
30 hard_range_inn: RANGE_0,
31 soft_range_inn: RANGE_0,
32 hard_range_out: RANGE_1,
33 soft_range_out: RANGE_1,
34 num_args: 1,
35 persistence_args: RANGE_0,
36 type_args: RANGE_0,
37 is_external_input: true,
38 has_singleton_output: false,
39 flo_type: Some(FloType::Source),
40 ports_inn: None,
41 ports_out: None,
42 input_delaytype_fn: |_| None,
43 write_fn: |wc @ &WriteContextArgs {
44 root,
45 context,
46 op_span,
47 ident,
48 arguments,
49 ..
50 },
51 _| {
52 let receiver = &arguments[0];
53 let stream_ident = wc.make_ident("stream");
54 let write_prologue = quote_spanned! {op_span=>
55 let mut #stream_ident = Box::pin(#receiver);
56 };
57 let write_iterator = quote_spanned! {op_span=>
58 let #ident = std::iter::from_fn(|| {
59 match #root::futures::stream::Stream::poll_next(#stream_ident.as_mut(), &mut std::task::Context::from_waker(&#context.waker())) {
60 std::task::Poll::Ready(Some(std::result::Result::Ok((payload, addr)))) => Some(#root::util::deserialize_from_bytes(payload).map(|payload| (payload, addr))),
61 std::task::Poll::Ready(Some(Err(_))) => None,
62 std::task::Poll::Ready(None) => None,
63 std::task::Poll::Pending => None,
64 }
65 });
66 };
67 Ok(OperatorWriteOutput {
68 write_prologue,
69 write_iterator,
70 ..Default::default()
71 })
72 },
73};