hydro_test/local/
count_elems.rs1use hydro_lang::*;
2
3pub fn count_elems<'a, T: 'a>(
4 process: &Process<'a>,
5 input_stream: Stream<T, Process<'a>, Unbounded>,
6) -> Stream<u32, Process<'a>, Unbounded> {
7 let tick = process.tick();
8
9 let count = input_stream
10 .map(q!(|_| 1))
11 .batch(&tick, nondet!())
12 .fold(q!(|| 0), q!(|a, b| *a += b))
13 .all_ticks();
14
15 count
16}
17
18#[cfg(test)]
19mod tests {
20 use futures::{SinkExt, StreamExt};
21 use hydro_deploy::Deployment;
22 use hydro_lang::Location;
23
24 #[tokio::test]
25 async fn test_count() {
26 let mut deployment = Deployment::new();
27
28 let builder = hydro_lang::FlowBuilder::new();
29 let external = builder.external::<()>();
30 let p1 = builder.process();
31
32 let (input_send, input) = p1.source_external_bincode(&external);
33 let out = super::count_elems(&p1, input);
34 let out_recv = out.send_bincode_external(&external);
35
36 let built = builder.with_default_optimize();
37 let nodes = built
38 .with_process(&p1, deployment.Localhost())
39 .with_external(&external, deployment.Localhost())
40 .deploy(&mut deployment);
41
42 deployment.deploy().await.unwrap();
43
44 let mut input_send = nodes.connect_sink_bincode(input_send).await;
45 let mut out_recv = nodes.connect_source_bincode(out_recv).await;
46
47 input_send.send(1).await.unwrap();
49 input_send.send(1).await.unwrap();
50 input_send.send(1).await.unwrap();
51
52 deployment.start().await.unwrap();
53
54 assert_eq!(out_recv.next().await.unwrap(), 0); assert_eq!(out_recv.next().await.unwrap(), 3); }
57}