hydro_test/local/
count_elems.rs

1use hydro_lang::*;
2
3pub fn count_elems<'a, T: 'a>(
4    process: &Process<'a>,
5    input_stream: Stream<T, Process<'a>, Unbounded>,
6) -> Stream<u32, Process<'a>, Unbounded> {
7    let tick = process.tick();
8
9    let count = input_stream
10        .map(q!(|_| 1))
11        .batch(&tick, nondet!(/** test */))
12        .fold(q!(|| 0), q!(|a, b| *a += b))
13        .all_ticks();
14
15    count
16}
17
18#[cfg(test)]
19mod tests {
20    use futures::{SinkExt, StreamExt};
21    use hydro_deploy::Deployment;
22    use hydro_lang::Location;
23
24    #[tokio::test]
25    async fn test_count() {
26        let mut deployment = Deployment::new();
27
28        let builder = hydro_lang::FlowBuilder::new();
29        let external = builder.external::<()>();
30        let p1 = builder.process();
31
32        let (input_send, input) = p1.source_external_bincode(&external);
33        let out = super::count_elems(&p1, input);
34        let out_recv = out.send_bincode_external(&external);
35
36        let built = builder.with_default_optimize();
37        let nodes = built
38            .with_process(&p1, deployment.Localhost())
39            .with_external(&external, deployment.Localhost())
40            .deploy(&mut deployment);
41
42        deployment.deploy().await.unwrap();
43
44        let mut input_send = nodes.connect_sink_bincode(input_send).await;
45        let mut out_recv = nodes.connect_source_bincode(out_recv).await;
46
47        // send before starting so everything shows up in single tick
48        input_send.send(1).await.unwrap();
49        input_send.send(1).await.unwrap();
50        input_send.send(1).await.unwrap();
51
52        deployment.start().await.unwrap();
53
54        assert_eq!(out_recv.next().await.unwrap(), 0); // first tick (no data yet)
55        assert_eq!(out_recv.next().await.unwrap(), 3); // second tick
56    }
57}