dfir_lang/graph/ops/
source_file.rs

1use quote::quote_spanned;
2use syn::parse_quote_spanned;
3
4use super::{
5    make_missing_runtime_msg, FloType, OperatorCategory, OperatorConstraints, OperatorWriteOutput,
6    WriteContextArgs, RANGE_0, RANGE_1,
7};
8
9/// > 0 input streams, 1 output stream
10///
11/// > Arguments: An [`AsRef`](https://doc.rust-lang.org/std/convert/trait.AsRef.html)`<`[`Path`](https://doc.rust-lang.org/nightly/std/path/struct.Path.html)`>`
12/// > for a file to read.
13///
14/// Reads the referenced file one line at a time. The line will NOT include the line ending.
15///
16/// Will panic if the file could not be read, or if the file contains bytes that are not valid UTF-8.
17///
18/// ```dfir
19/// source_file("Cargo.toml") -> for_each(|line| println!("{}", line));
20/// ```
21pub const SOURCE_FILE: OperatorConstraints = OperatorConstraints {
22    name: "source_file",
23    categories: &[OperatorCategory::Source],
24    hard_range_inn: RANGE_0,
25    soft_range_inn: RANGE_0,
26    hard_range_out: RANGE_1,
27    soft_range_out: RANGE_1,
28    num_args: 1,
29    persistence_args: RANGE_0,
30    type_args: &(0..=1),
31    is_external_input: true,
32    has_singleton_output: false,
33    flo_type: Some(FloType::Source),
34    ports_inn: None,
35    ports_out: None,
36    input_delaytype_fn: |_| None,
37    write_fn: |wc @ &WriteContextArgs {
38                   root,
39                   op_span,
40                   ident,
41                   op_name,
42                   arguments,
43                   ..
44               },
45               diagnostics| {
46        let filename_arg = &arguments[0];
47
48        let ident_filelines = wc.make_ident("filelines");
49
50        let missing_runtime_msg = make_missing_runtime_msg(op_name);
51
52        let write_prologue = quote_spanned! {op_span=>
53            let #ident_filelines = {
54                // Could use `let file = #root::tokio::fs::File::open(#arguments).await` directly,
55                // but only if we're in an async fn, which we can't know (right now).
56                let file = ::std::fs::File::open(#filename_arg).expect("Failed to open file for reading");
57                let file = #root::tokio::fs::File::from_std(file);
58                let bufread = #root::tokio::io::BufReader::new(file);
59                let lines = #root::tokio::io::AsyncBufReadExt::lines(bufread);
60                #root::tokio_stream::wrappers::LinesStream::new(lines)
61            };
62        };
63        let wc = WriteContextArgs {
64            arguments: &parse_quote_spanned!(op_span=> #ident_filelines),
65            ..wc.clone()
66        };
67
68        let OperatorWriteOutput {
69            write_prologue: write_prologue_stream,
70            write_iterator,
71            write_iterator_after,
72        } = (super::source_stream::SOURCE_STREAM.write_fn)(&wc, diagnostics)?;
73
74        let write_prologue = quote_spanned! {op_span=>
75            #write_prologue
76            #write_prologue_stream
77        };
78        let write_iterator = quote_spanned! {op_span=>
79            ::std::debug_assert!(#root::tokio::runtime::Handle::try_current().is_ok(), #missing_runtime_msg);
80            #write_iterator
81            // Unwrap each line. Will panic if invalid utf-8.
82            let #ident = #ident.map(|result| result.unwrap());
83        };
84
85        Ok(OperatorWriteOutput {
86            write_prologue,
87            write_iterator,
88            write_iterator_after,
89        })
90    },
91};