hydro_test/local/
graph_reachability.rs1use hydro_lang::live_collections::stream::NoOrder;
2use hydro_lang::prelude::*;
3
4pub fn graph_reachability<'a>(
5 roots: Stream<u32, Process<'a>, Unbounded>,
6 edges: Stream<(u32, u32), Process<'a>, Unbounded>,
7) -> Stream<u32, Process<'a>, Unbounded, NoOrder> {
8 let spinner = roots.location().spin();
9
10 sliced! {
11 let mut reached = use::state_null::<Stream<_, _, _, NoOrder>>();
12 let new_roots = use(roots, nondet!());
13 let current_edges = use(edges.collect_vec(), nondet!());
14 let spin = use(spinner, nondet!());
15
16 reached = reached.chain(new_roots);
17 let reachable = reached
18 .clone()
19 .map(q!(|r| (r, ())))
20 .join(current_edges.flatten_ordered())
21 .map(q!(|(_from, (_, to))| to));
22
23 reached = reached.chain(reachable);
24
25 reached.clone().cross_singleton(spin.count()).map(q!(|(v, _)| v)) }.unique()
27}
28
29#[cfg(test)]
30mod tests {
31 use futures::{SinkExt, StreamExt};
32 use hydro_deploy::Deployment;
33 use hydro_lang::location::Location;
34
35 #[tokio::test]
36 async fn test_reachability() {
37 let mut deployment = Deployment::new();
38
39 let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
40 let external = builder.external::<()>();
41 let p1 = builder.process();
42
43 let (roots_send, roots) = p1.source_external_bincode(&external);
44 let (edges_send, edges) = p1.source_external_bincode(&external);
45 let out = super::graph_reachability(roots, edges);
46 let out_recv = out.send_bincode_external(&external);
47
48 let mut built = builder.with_default_optimize();
49
50 println!(
51 "{}",
52 built
53 .preview_compile()
54 .dfir_for(&p1)
55 .surface_syntax_string()
56 );
57
58 let nodes = built
59 .with_process(&p1, deployment.Localhost())
60 .with_external(&external, deployment.Localhost())
61 .deploy(&mut deployment);
62
63 deployment.deploy().await.unwrap();
64
65 let mut roots_send = nodes.connect(roots_send).await;
66 let mut edges_send = nodes.connect(edges_send).await;
67 let out_recv = nodes.connect(out_recv).await;
68
69 deployment.start().await.unwrap();
70
71 roots_send.send(1).await.unwrap();
72 roots_send.send(2).await.unwrap();
73
74 edges_send.send((1, 2)).await.unwrap();
75 edges_send.send((2, 3)).await.unwrap();
76 edges_send.send((3, 4)).await.unwrap();
77 edges_send.send((4, 5)).await.unwrap();
78
79 assert_eq!(out_recv.take(5).collect::<Vec<_>>().await, &[1, 2, 3, 4, 5]);
80 }
81}