dfir_lang/graph/ops/source_interval.rs
1use quote::quote_spanned;
2use syn::parse_quote_spanned;
3
4use super::{
5 FloType, OperatorCategory, OperatorConstraints, OperatorWriteOutput, WriteContextArgs, RANGE_0,
6 RANGE_1,
7};
8
9/// > 0 input streams, 1 output stream
10///
11/// > Arguments: A [`Duration`](https://doc.rust-lang.org/stable/std/time/struct.Duration.html) for this interval.
12///
13/// Emits units `()` on a repeated interval. The first tick completes immediately. Missed ticks will
14/// be scheduled as soon as possible.
15///
16/// Note that this requires the dfir instance be run within a [Tokio `Runtime`](https://docs.rs/tokio/1/tokio/runtime/struct.Runtime.html).
17/// The easiest way to do this is with a [`#[dfir_rs::main]`](https://hydro.run/rustdoc/dfir_rs/attr.main)
18/// annotation on `async fn main() { ... }` as in the example below.
19///
20/// ```rustbook
21/// use std::time::Duration;
22/// use std::time::Instant;
23///use dfir_rs::dfir_syntax;
24///
25/// #[dfir_rs::main]
26/// async fn main() {
27/// let mut hf = dfir_syntax! {
28/// source_interval(Duration::from_secs(1))
29/// -> map(|_| { Instant::now() } )
30/// -> for_each(|time| println!("This runs every second: {:?}", time));
31/// };
32///
33/// // Will print 4 times (fencepost counting).
34/// tokio::time::timeout(Duration::from_secs_f32(3.5), hf.run_async())
35/// .await
36/// .expect_err("Expected time out");
37///
38/// // Example output:
39/// // This runs every second: Instant { t: 27471.704813s }
40/// // This runs every second: Instant { t: 27472.704813s }
41/// // This runs every second: Instant { t: 27473.704813s }
42/// // This runs every second: Instant { t: 27474.704813s }
43/// }
44/// ```
45pub const SOURCE_INTERVAL: OperatorConstraints = OperatorConstraints {
46 name: "source_interval",
47 categories: &[OperatorCategory::Source],
48 hard_range_inn: RANGE_0,
49 soft_range_inn: RANGE_0,
50 hard_range_out: RANGE_1,
51 soft_range_out: RANGE_1,
52 num_args: 1,
53 persistence_args: RANGE_0,
54 type_args: RANGE_0,
55 is_external_input: true,
56 has_singleton_output: false,
57 flo_type: Some(FloType::Source),
58 ports_inn: None,
59 ports_out: None,
60 input_delaytype_fn: |_| None,
61 write_fn: |wc @ &WriteContextArgs {
62 root,
63 op_span,
64 arguments,
65 ..
66 },
67 diagnostics| {
68 let ident_intervalstream = wc.make_ident("intervalstream");
69 let mut write_prologue = quote_spanned! {op_span=>
70 let #ident_intervalstream =
71 #root::tokio_stream::StreamExt::map(
72 #root::tokio_stream::wrappers::IntervalStream::new(#root::tokio::time::interval(#arguments)),
73 |_| { }
74 );
75 };
76 let wc = WriteContextArgs {
77 arguments: &parse_quote_spanned!(op_span=> #ident_intervalstream),
78 ..wc.clone()
79 };
80 let write_output = (super::source_stream::SOURCE_STREAM.write_fn)(&wc, diagnostics)?;
81 write_prologue.extend(write_output.write_prologue);
82 Ok(OperatorWriteOutput {
83 write_prologue,
84 ..write_output
85 })
86 },
87};