hydro_test/local/
count_elems.rs1use hydro_lang::prelude::*;
2
3pub fn count_elems<'a, T: 'a>(
4 input_stream: Stream<T, Process<'a>, Unbounded>,
5) -> Stream<u32, Process<'a>, Unbounded> {
6 sliced! {
7 let batch = use(input_stream.map(q!(|_| 1)), nondet!());
8 batch.fold(q!(|| 0), q!(|a, b| *a += b)).into_stream()
9 }
10}
11
12#[cfg(test)]
13mod tests {
14 use futures::{SinkExt, StreamExt};
15 use hydro_deploy::Deployment;
16 use hydro_lang::location::Location;
17
18 #[tokio::test]
19 async fn test_count() {
20 let mut deployment = Deployment::new();
21
22 let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
23 let external = builder.external::<()>();
24 let p1 = builder.process();
25
26 let (input_send, input) = p1.source_external_bincode(&external);
27 let out = super::count_elems(input);
28 let out_recv = out.send_bincode_external(&external);
29
30 let built = builder.with_default_optimize();
31 let nodes = built
32 .with_process(&p1, deployment.Localhost())
33 .with_external(&external, deployment.Localhost())
34 .deploy(&mut deployment);
35
36 deployment.deploy().await.unwrap();
37
38 let mut input_send = nodes.connect(input_send).await;
39 let mut out_recv = nodes.connect(out_recv).await;
40
41 input_send.send(1).await.unwrap();
43 input_send.send(1).await.unwrap();
44 input_send.send(1).await.unwrap();
45
46 deployment.start().await.unwrap();
47
48 assert_eq!(out_recv.next().await.unwrap(), 3);
49 }
50}