Skip to main content

hydro_test/local/
count_elems.rs

1use hydro_lang::prelude::*;
2
3pub fn count_elems<'a, T: 'a>(
4    input_stream: Stream<T, Process<'a>, Unbounded>,
5) -> Stream<u32, Process<'a>, Unbounded> {
6    sliced! {
7        let batch = use(input_stream.map(q!(|_| 1)), nondet!(/** test */));
8        batch.fold(q!(|| 0), q!(|a, b| *a += b)).into_stream()
9    }
10}
11
12#[cfg(test)]
13mod tests {
14    use futures::{SinkExt, StreamExt};
15    use hydro_deploy::Deployment;
16    use hydro_lang::location::Location;
17
18    #[tokio::test]
19    async fn test_count() {
20        let mut deployment = Deployment::new();
21
22        let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
23        let external = builder.external::<()>();
24        let p1 = builder.process();
25
26        let (input_send, input) = p1.source_external_bincode(&external);
27        let out = super::count_elems(input);
28        let out_recv = out.send_bincode_external(&external);
29
30        let built = builder.with_default_optimize();
31        let nodes = built
32            .with_process(&p1, deployment.Localhost())
33            .with_external(&external, deployment.Localhost())
34            .deploy(&mut deployment);
35
36        deployment.deploy().await.unwrap();
37
38        let mut input_send = nodes.connect(input_send).await;
39        let mut out_recv = nodes.connect(out_recv).await;
40
41        // send before starting so everything shows up in single tick
42        input_send.send(1).await.unwrap();
43        input_send.send(1).await.unwrap();
44        input_send.send(1).await.unwrap();
45
46        deployment.start().await.unwrap();
47
48        assert_eq!(out_recv.next().await.unwrap(), 3);
49    }
50}