hydro_test_local_macro/local/
count_elems.rs
1use dfir_rs::tokio::sync::mpsc::UnboundedSender;
2use dfir_rs::tokio_stream::wrappers::UnboundedReceiverStream;
3use hydro_lang::deploy::SingleProcessGraph;
4use hydro_lang::dfir_rs::scheduled::graph::Dfir;
5use hydro_lang::*;
6use stageleft::{Quoted, RuntimeData};
7
8pub fn count_elems_generic<'a, T: 'a>(
9 flow: FlowBuilder<'a>,
10 input_stream: RuntimeData<UnboundedReceiverStream<T>>,
11 output: RuntimeData<&'a UnboundedSender<u32>>,
12) -> impl Quoted<'a, Dfir<'a>> {
13 let process = flow.process::<()>();
14 let tick = process.tick();
15
16 let source = process.source_stream(input_stream);
17 let count = unsafe {
18 source.map(q!(|_| 1)).tick_batch(&tick)
20 }
21 .fold(q!(|| 0), q!(|a, b| *a += b))
22 .all_ticks();
23
24 count.for_each(q!(|v| {
25 output.send(v).unwrap();
26 }));
27
28 flow.compile_no_network::<SingleProcessGraph>()
29}
30
31#[stageleft::entry]
32pub fn count_elems<'a>(
33 flow: FlowBuilder<'a>,
34 input_stream: RuntimeData<UnboundedReceiverStream<usize>>,
35 output: RuntimeData<&'a UnboundedSender<u32>>,
36) -> impl Quoted<'a, Dfir<'a>> {
37 count_elems_generic(flow, input_stream, output)
38}
39
40#[cfg(stageleft_runtime)]
41#[cfg(test)]
42mod tests {
43 use dfir_rs::assert_graphvis_snapshots;
44 use dfir_rs::util::collect_ready;
45
46 #[test]
47 pub fn test_count() {
48 let (in_send, input) = dfir_rs::util::unbounded_channel();
49 let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
50
51 let mut count = super::count_elems!(input, &out);
52 assert_graphvis_snapshots!(count);
53
54 in_send.send(1).unwrap();
55 in_send.send(1).unwrap();
56 in_send.send(1).unwrap();
57
58 count.run_tick();
59
60 assert_eq!(&*collect_ready::<Vec<_>, _>(&mut out_recv), &[3]);
61 }
62}