dfir_lang/graph/ops/
source_stdin.rs
1use quote::quote_spanned;
2
3use super::{
4 FloType, OperatorCategory, OperatorConstraints, OperatorWriteOutput, WriteContextArgs, RANGE_0,
5 RANGE_1,
6};
7
8pub const SOURCE_STDIN: OperatorConstraints = OperatorConstraints {
21 name: "source_stdin",
22 categories: &[OperatorCategory::Source],
23 hard_range_inn: RANGE_0,
24 soft_range_inn: RANGE_0,
25 hard_range_out: RANGE_1,
26 soft_range_out: RANGE_1,
27 num_args: 0,
28 persistence_args: RANGE_0,
29 type_args: RANGE_0,
30 is_external_input: true,
31 has_singleton_output: false,
32 flo_type: Some(FloType::Source),
33 ports_inn: None,
34 ports_out: None,
35 input_delaytype_fn: |_| None,
36 write_fn: |wc @ &WriteContextArgs {
37 root,
38 context,
39 op_span,
40 ident,
41 ..
42 },
43 _| {
44 let stream_ident = wc.make_ident("stream");
45 let write_prologue = quote_spanned! {op_span=>
46 #[expect(clippy::let_and_return, reason = "gives return value a self-documenting name")]
47 let mut #stream_ident = {
48 use #root::tokio::io::AsyncBufReadExt;
49 let reader = #root::tokio::io::BufReader::new(#root::tokio::io::stdin());
50 let stdin_lines = #root::tokio_stream::wrappers::LinesStream::new(reader.lines());
51 stdin_lines
52 };
53 };
54 let write_iterator = quote_spanned! {op_span=>
55 let #ident = std::iter::from_fn(|| {
56 match #root::futures::stream::Stream::poll_next(std::pin::Pin::new(&mut #stream_ident), &mut std::task::Context::from_waker(&#context.waker())) {
57 std::task::Poll::Ready(maybe) => maybe,
58 std::task::Poll::Pending => None,
59 }
60 });
61 };
62 Ok(OperatorWriteOutput {
63 write_prologue,
64 write_iterator,
65 ..Default::default()
66 })
67 },
68};