dfir_lang/graph/ops/
dest_file.rs

1use quote::quote_spanned;
2use syn::parse_quote_spanned;
3
4use super::{
5    make_missing_runtime_msg, OperatorCategory, OperatorConstraints,
6    OperatorWriteOutput, WriteContextArgs, RANGE_0, RANGE_1,
7};
8
9/// > 0 input streams, 1 output stream
10///
11/// > Arguments: (1) An [`AsRef`](https://doc.rust-lang.org/std/convert/trait.AsRef.html)`<`[`Path`](https://doc.rust-lang.org/nightly/std/path/struct.Path.html)`>`
12/// > for a file to write to, and (2) a bool `append`.
13///
14/// Consumes `String`s by writing them as lines to a file. The file will be created if it doesn't
15/// exist. Lines will be appended to the file if `append` is true, otherwise the file will be
16/// truncated before lines are written.
17///
18/// Note this operator must be used within a Tokio runtime.
19///
20/// ```dfir
21/// source_iter(1..=10) -> map(|n| format!("Line {}", n)) -> dest_file("dest.txt", false);
22/// ```
23pub const DEST_FILE: OperatorConstraints = OperatorConstraints {
24    name: "dest_file",
25    categories: &[OperatorCategory::Sink],
26    hard_range_inn: RANGE_1,
27    soft_range_inn: RANGE_1,
28    hard_range_out: RANGE_0,
29    soft_range_out: RANGE_0,
30    num_args: 2,
31    persistence_args: RANGE_0,
32    type_args: RANGE_0,
33    is_external_input: false,
34    has_singleton_output: false,
35    flo_type: None,
36    ports_inn: None,
37    ports_out: None,
38    input_delaytype_fn: |_| None,
39    write_fn: |wc @ &WriteContextArgs {
40                   root,
41                   op_span,
42                   op_name,
43                   arguments,
44                   ..
45               },
46               diagnostics| {
47        let filename_arg = &arguments[0];
48        let append_arg = &arguments[1];
49
50        let ident_filesink = wc.make_ident("filesink");
51
52        let missing_runtime_msg = make_missing_runtime_msg(op_name);
53
54        let write_prologue = quote_spanned! {op_span=>
55            let #ident_filesink = {
56                // Could use `#root::tokio::fs::OpenOptions` but only if we're in an async fn,
57                // which we can't know (right now)
58                let append = #append_arg;
59                let file = ::std::fs::OpenOptions::new()
60                    .create(true)
61                    .write(true)
62                    .append(append)
63                    .truncate(!append)
64                    .open(#filename_arg)
65                    .expect("Failed to open file for writing");
66                let file = #root::tokio::fs::File::from_std(file);
67                let bufwrite = #root::tokio::io::BufWriter::new(file);
68                let codec = #root::tokio_util::codec::LinesCodec::new();
69                #root::tokio_util::codec::FramedWrite::new(bufwrite, codec)
70            };
71        };
72        let wc = WriteContextArgs {
73            arguments: &parse_quote_spanned!(op_span=> #ident_filesink),
74            ..wc.clone()
75        };
76
77        let OperatorWriteOutput {
78            write_prologue: write_prologue_sink,
79            write_iterator,
80            write_iterator_after,
81        } = (super::dest_sink::DEST_SINK.write_fn)(&wc, diagnostics)?;
82
83        let write_prologue = quote_spanned! {op_span=>
84            #write_prologue
85            #write_prologue_sink
86        };
87        let write_iterator = quote_spanned! {op_span=>
88            ::std::debug_assert!(#root::tokio::runtime::Handle::try_current().is_ok(), #missing_runtime_msg);
89            #write_iterator
90        };
91
92        Ok(OperatorWriteOutput {
93            write_prologue,
94            write_iterator,
95            write_iterator_after,
96        })
97    },
98};