1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use quote::quote_spanned;
use syn::parse_quote_spanned;

use super::{
    FloType, OperatorCategory, OperatorConstraints, OperatorWriteOutput, WriteContextArgs, RANGE_0,
    RANGE_1,
};

/// > 0 input streams, 1 output stream
///
/// > Arguments: A [`Duration`](https://doc.rust-lang.org/stable/std/time/struct.Duration.html) for this interval.
///
/// Emits units `()` on a repeated interval. The first tick completes immediately. Missed ticks will
/// be scheduled as soon as possible.
///
/// Note that this requires the dfir instance be run within a [Tokio `Runtime`](https://docs.rs/tokio/1/tokio/runtime/struct.Runtime.html).
/// The easiest way to do this is with a [`#[dfir_rs::main]`](https://hydro.run/rustdoc/dfir_rs/attr.main)
/// annotation on `async fn main() { ... }` as in the example below.
///
/// ```rustbook
/// use std::time::Duration;
/// use std::time::Instant;
///use dfir_rs::dfir_syntax;
///
/// #[dfir_rs::main]
/// async fn main() {
///     let mut hf = dfir_syntax! {
///         source_interval(Duration::from_secs(1))
///             -> map(|_| { Instant::now() } )
///             -> for_each(|time| println!("This runs every second: {:?}", time));
///     };
///
///     // Will print 4 times (fencepost counting).
///     tokio::time::timeout(Duration::from_secs_f32(3.5), hf.run_async())
///         .await
///         .expect_err("Expected time out");
///
///     // Example output:
///     // This runs every second: Instant { t: 27471.704813s }
///     // This runs every second: Instant { t: 27472.704813s }
///     // This runs every second: Instant { t: 27473.704813s }
///     // This runs every second: Instant { t: 27474.704813s }
/// }
/// ```
pub const SOURCE_INTERVAL: OperatorConstraints = OperatorConstraints {
    name: "source_interval",
    categories: &[OperatorCategory::Source],
    hard_range_inn: RANGE_0,
    soft_range_inn: RANGE_0,
    hard_range_out: RANGE_1,
    soft_range_out: RANGE_1,
    num_args: 1,
    persistence_args: RANGE_0,
    type_args: RANGE_0,
    is_external_input: true,
    has_singleton_output: false,
    flo_type: Some(FloType::Source),
    ports_inn: None,
    ports_out: None,
    input_delaytype_fn: |_| None,
    write_fn: |wc @ &WriteContextArgs {
                   root,
                   op_span,
                   arguments,
                   ..
               },
               diagnostics| {
        let ident_intervalstream = wc.make_ident("intervalstream");
        let mut write_prologue = quote_spanned! {op_span=>
            let #ident_intervalstream =
                #root::tokio_stream::StreamExt::map(
                    #root::tokio_stream::wrappers::IntervalStream::new(#root::tokio::time::interval(#arguments)),
                    |_| {  }
                );
        };
        let wc = WriteContextArgs {
            arguments: &parse_quote_spanned!(op_span=> #ident_intervalstream),
            ..wc.clone()
        };
        let write_output = (super::source_stream::SOURCE_STREAM.write_fn)(&wc, diagnostics)?;
        write_prologue.extend(write_output.write_prologue);
        Ok(OperatorWriteOutput {
            write_prologue,
            ..write_output
        })
    },
};