hydro_test_local/local/
count_elems.rs

1use dfir_rs::tokio::sync::mpsc::UnboundedSender;
2use dfir_rs::tokio_stream::wrappers::UnboundedReceiverStream;
3use hydro_lang::deploy::SingleProcessGraph;
4use hydro_lang::dfir_rs::scheduled::graph::Dfir;
5use hydro_lang::*;
6use stageleft::{Quoted, RuntimeData};
7
8pub fn count_elems_generic<'a, T: 'a>(
9    flow: FlowBuilder<'a>,
10    input_stream: RuntimeData<UnboundedReceiverStream<T>>,
11    output: RuntimeData<&'a UnboundedSender<u32>>,
12) -> impl Quoted<'a, Dfir<'a>> {
13    let process = flow.process::<()>();
14    let tick = process.tick();
15
16    let source = process.source_stream(input_stream);
17    let count = unsafe {
18        // SAFETY: intentionally using ticks
19        source.map(q!(|_| 1)).tick_batch(&tick)
20    }
21    .fold(q!(|| 0), q!(|a, b| *a += b))
22    .all_ticks();
23
24    count.for_each(q!(|v| {
25        output.send(v).unwrap();
26    }));
27
28    flow.compile_no_network::<SingleProcessGraph>()
29}
30
31#[stageleft::entry]
32pub fn count_elems<'a>(
33    flow: FlowBuilder<'a>,
34    input_stream: RuntimeData<UnboundedReceiverStream<usize>>,
35    output: RuntimeData<&'a UnboundedSender<u32>>,
36) -> impl Quoted<'a, Dfir<'a>> {
37    count_elems_generic(flow, input_stream, output)
38}
39
40#[cfg(stageleft_runtime)]
41#[cfg(test)]
42mod tests {
43    use dfir_rs::assert_graphvis_snapshots;
44    use dfir_rs::util::collect_ready;
45
46    #[test]
47    pub fn test_count() {
48        let (in_send, input) = dfir_rs::util::unbounded_channel();
49        let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
50
51        let mut count = super::count_elems!(input, &out);
52        assert_graphvis_snapshots!(count);
53
54        in_send.send(1).unwrap();
55        in_send.send(1).unwrap();
56        in_send.send(1).unwrap();
57
58        count.run_tick();
59
60        assert_eq!(&*collect_ready::<Vec<_>, _>(&mut out_recv), &[3]);
61    }
62}