dfir_lang/graph/ops/
source_stream.rs
1use quote::quote_spanned;
2
3use super::{
4 FloType, OperatorCategory, OperatorConstraints, OperatorWriteOutput, WriteContextArgs, RANGE_0,
5 RANGE_1,
6};
7
8pub const SOURCE_STREAM: OperatorConstraints = OperatorConstraints {
28 name: "source_stream",
29 categories: &[OperatorCategory::Source],
30 hard_range_inn: RANGE_0,
31 soft_range_inn: RANGE_0,
32 hard_range_out: RANGE_1,
33 soft_range_out: RANGE_1,
34 num_args: 1,
35 persistence_args: RANGE_0,
36 type_args: RANGE_0,
37 is_external_input: true,
38 has_singleton_output: false,
39 flo_type: Some(FloType::Source),
40 ports_inn: None,
41 ports_out: None,
42 input_delaytype_fn: |_| None,
43 write_fn: |wc @ &WriteContextArgs {
44 root,
45 context,
46 op_span,
47 ident,
48 arguments,
49 ..
50 },
51 _| {
52 let receiver = &arguments[0];
53 let stream_ident = wc.make_ident("stream");
54 let write_prologue = quote_spanned! {op_span=>
55 let mut #stream_ident = {
56 #[inline(always)]
57 fn check_stream<Stream: #root::futures::stream::Stream<Item = Item> + ::std::marker::Unpin, Item>(stream: Stream)
58 -> impl #root::futures::stream::Stream<Item = Item> + ::std::marker::Unpin
59 {
60 stream
61 }
62 check_stream(#receiver)
63 };
64 };
65 let write_iterator = quote_spanned! {op_span=>
66 let #ident = std::iter::from_fn(|| {
67 match #root::futures::stream::Stream::poll_next(::std::pin::Pin::new(&mut #stream_ident), &mut std::task::Context::from_waker(&#context.waker())) {
68 std::task::Poll::Ready(maybe) => maybe,
69 std::task::Poll::Pending => None,
70 }
71 });
72 };
73 Ok(OperatorWriteOutput {
74 write_prologue,
75 write_iterator,
76 ..Default::default()
77 })
78 },
79};