hydro_test/local/
graph_reachability.rs1use hydro_lang::*;
2
3pub fn graph_reachability<'a>(
4 process: &Process<'a>,
5 roots: Stream<u32, Process<'a>, Unbounded>,
6 edges: Stream<(u32, u32), Process<'a>, Unbounded>,
7) -> Stream<u32, Process<'a>, Unbounded, NoOrder> {
8 let reachability_tick = process.tick();
9 let (set_reached_cycle, reached_cycle) = reachability_tick.cycle::<Stream<_, _, _, NoOrder>>();
10
11 let reached = roots
12 .batch(
13 &reachability_tick,
14 nondet!(),
15 )
16 .chain(reached_cycle);
17 let reachable = reached
18 .clone()
19 .map(q!(|r| (r, ())))
20 .join(
21 edges
22 .batch(
23 &reachability_tick,
24 nondet!(),
25 )
26 .persist(),
27 )
28 .map(q!(|(_from, (_, to))| to));
29 set_reached_cycle.complete_next_tick(reached.clone().chain(reachable));
30
31 reached.all_ticks().unique()
32}
33
34#[cfg(test)]
35mod tests {
36 use futures::{SinkExt, StreamExt};
37 use hydro_deploy::Deployment;
38 use hydro_lang::Location;
39
40 #[tokio::test]
41 #[ignore = "broken because ticks in Hydro are only triggered by external input"]
42 async fn test_reachability() {
43 let mut deployment = Deployment::new();
44
45 let builder = hydro_lang::FlowBuilder::new();
46 let external = builder.external::<()>();
47 let p1 = builder.process();
48
49 let (roots_send, roots) = p1.source_external_bincode(&external);
50 let (edges_send, edges) = p1.source_external_bincode(&external);
51 let out = super::graph_reachability(&p1, roots, edges);
52 let out_recv = out.send_bincode_external(&external);
53
54 let built = builder.with_default_optimize();
55
56 println!(
57 "{}",
58 built
59 .preview_compile()
60 .dfir_for(&p1)
61 .surface_syntax_string()
62 );
63
64 let nodes = built
65 .with_process(&p1, deployment.Localhost())
66 .with_external(&external, deployment.Localhost())
67 .deploy(&mut deployment);
68
69 deployment.deploy().await.unwrap();
70
71 let mut roots_send = nodes.connect_sink_bincode(roots_send).await;
72 let mut edges_send = nodes.connect_sink_bincode(edges_send).await;
73 let out_recv = nodes.connect_source_bincode(out_recv).await;
74
75 deployment.start().await.unwrap();
76
77 roots_send.send(1).await.unwrap();
78 roots_send.send(2).await.unwrap();
79
80 edges_send.send((1, 2)).await.unwrap();
81 edges_send.send((2, 3)).await.unwrap();
82 edges_send.send((3, 4)).await.unwrap();
83 edges_send.send((4, 5)).await.unwrap();
84
85 assert_eq!(out_recv.take(5).collect::<Vec<_>>().await, &[1, 2, 3, 4, 5]);
86 }
87}