dfir_lang/graph/ops/
source_interval.rs

1use quote::quote_spanned;
2use syn::parse_quote_spanned;
3
4use super::{
5    FloType, OperatorCategory, OperatorConstraints, OperatorWriteOutput, WriteContextArgs, RANGE_0,
6    RANGE_1,
7};
8
9/// > 0 input streams, 1 output stream
10///
11/// > Arguments: A [`Duration`](https://doc.rust-lang.org/stable/std/time/struct.Duration.html) for this interval.
12///
13/// Emits units `()` on a repeated interval. The first tick completes immediately. Missed ticks will
14/// be scheduled as soon as possible.
15///
16/// Note that this requires the dfir instance be run within a [Tokio `Runtime`](https://docs.rs/tokio/1/tokio/runtime/struct.Runtime.html).
17/// The easiest way to do this is with a [`#[dfir_rs::main]`](https://hydro.run/rustdoc/dfir_rs/attr.main)
18/// annotation on `async fn main() { ... }` as in the example below.
19///
20/// ```rustbook
21/// use std::time::Duration;
22/// use std::time::Instant;
23///use dfir_rs::dfir_syntax;
24///
25/// #[dfir_rs::main]
26/// async fn main() {
27///     let mut hf = dfir_syntax! {
28///         source_interval(Duration::from_secs(1))
29///             -> map(|_| { Instant::now() } )
30///             -> for_each(|time| println!("This runs every second: {:?}", time));
31///     };
32///
33///     // Will print 4 times (fencepost counting).
34///     tokio::time::timeout(Duration::from_secs_f32(3.5), hf.run_async())
35///         .await
36///         .expect_err("Expected time out");
37///
38///     // Example output:
39///     // This runs every second: Instant { t: 27471.704813s }
40///     // This runs every second: Instant { t: 27472.704813s }
41///     // This runs every second: Instant { t: 27473.704813s }
42///     // This runs every second: Instant { t: 27474.704813s }
43/// }
44/// ```
45pub const SOURCE_INTERVAL: OperatorConstraints = OperatorConstraints {
46    name: "source_interval",
47    categories: &[OperatorCategory::Source],
48    hard_range_inn: RANGE_0,
49    soft_range_inn: RANGE_0,
50    hard_range_out: RANGE_1,
51    soft_range_out: RANGE_1,
52    num_args: 1,
53    persistence_args: RANGE_0,
54    type_args: RANGE_0,
55    is_external_input: true,
56    has_singleton_output: false,
57    flo_type: Some(FloType::Source),
58    ports_inn: None,
59    ports_out: None,
60    input_delaytype_fn: |_| None,
61    write_fn: |wc @ &WriteContextArgs {
62                   root,
63                   op_span,
64                   arguments,
65                   ..
66               },
67               diagnostics| {
68        let ident_intervalstream = wc.make_ident("intervalstream");
69        let mut write_prologue = quote_spanned! {op_span=>
70            let #ident_intervalstream =
71                #root::tokio_stream::StreamExt::map(
72                    #root::tokio_stream::wrappers::IntervalStream::new(#root::tokio::time::interval(#arguments)),
73                    |_| {  }
74                );
75        };
76        let wc = WriteContextArgs {
77            arguments: &parse_quote_spanned!(op_span=> #ident_intervalstream),
78            ..wc.clone()
79        };
80        let write_output = (super::source_stream::SOURCE_STREAM.write_fn)(&wc, diagnostics)?;
81        write_prologue.extend(write_output.write_prologue);
82        Ok(OperatorWriteOutput {
83            write_prologue,
84            ..write_output
85        })
86    },
87};