hydro_test/local/
graph_reachability.rs

1use hydro_lang::*;
2
3pub fn graph_reachability<'a>(
4    process: &Process<'a>,
5    roots: Stream<u32, Process<'a>, Unbounded>,
6    edges: Stream<(u32, u32), Process<'a>, Unbounded>,
7) -> Stream<u32, Process<'a>, Unbounded, NoOrder> {
8    let reachability_tick = process.tick();
9    let (set_reached_cycle, reached_cycle) = reachability_tick.cycle::<Stream<_, _, _, NoOrder>>();
10
11    let reached = roots
12        .batch(
13            &reachability_tick,
14            nondet!(/** roots can be inserted on any tick because we are fixpointing */),
15        )
16        .chain(reached_cycle);
17    let reachable = reached
18        .clone()
19        .map(q!(|r| (r, ())))
20        .join(
21            edges
22                .batch(
23                    &reachability_tick,
24                    nondet!(/** edges can be inserted on any tick because we are fixpointing */),
25                )
26                .persist(),
27        )
28        .map(q!(|(_from, (_, to))| to));
29    set_reached_cycle.complete_next_tick(reached.clone().chain(reachable));
30
31    reached.all_ticks().unique()
32}
33
34#[cfg(test)]
35mod tests {
36    use futures::{SinkExt, StreamExt};
37    use hydro_deploy::Deployment;
38    use hydro_lang::Location;
39
40    #[tokio::test]
41    #[ignore = "broken because ticks in Hydro are only triggered by external input"]
42    async fn test_reachability() {
43        let mut deployment = Deployment::new();
44
45        let builder = hydro_lang::FlowBuilder::new();
46        let external = builder.external::<()>();
47        let p1 = builder.process();
48
49        let (roots_send, roots) = p1.source_external_bincode(&external);
50        let (edges_send, edges) = p1.source_external_bincode(&external);
51        let out = super::graph_reachability(&p1, roots, edges);
52        let out_recv = out.send_bincode_external(&external);
53
54        let built = builder.with_default_optimize();
55
56        println!(
57            "{}",
58            built
59                .preview_compile()
60                .dfir_for(&p1)
61                .surface_syntax_string()
62        );
63
64        let nodes = built
65            .with_process(&p1, deployment.Localhost())
66            .with_external(&external, deployment.Localhost())
67            .deploy(&mut deployment);
68
69        deployment.deploy().await.unwrap();
70
71        let mut roots_send = nodes.connect_sink_bincode(roots_send).await;
72        let mut edges_send = nodes.connect_sink_bincode(edges_send).await;
73        let out_recv = nodes.connect_source_bincode(out_recv).await;
74
75        deployment.start().await.unwrap();
76
77        roots_send.send(1).await.unwrap();
78        roots_send.send(2).await.unwrap();
79
80        edges_send.send((1, 2)).await.unwrap();
81        edges_send.send((2, 3)).await.unwrap();
82        edges_send.send((3, 4)).await.unwrap();
83        edges_send.send((4, 5)).await.unwrap();
84
85        assert_eq!(out_recv.take(5).collect::<Vec<_>>().await, &[1, 2, 3, 4, 5]);
86    }
87}