Skip to main content

hydro_test/local/
graph_reachability.rs

1use hydro_lang::live_collections::stream::NoOrder;
2use hydro_lang::prelude::*;
3
4pub fn graph_reachability<'a>(
5    roots: Stream<u32, Process<'a>, Unbounded>,
6    edges: Stream<(u32, u32), Process<'a>, Unbounded>,
7) -> Stream<u32, Process<'a>, Unbounded, NoOrder> {
8    let spinner = roots.location().spin();
9
10    sliced! {
11        let mut reached = use::state_null::<Stream<_, _, _, NoOrder>>();
12        let new_roots = use(roots, nondet!(/** roots can be inserted on any tick because we are fixpointing */));
13        let current_edges = use(edges.collect_vec(), nondet!(/** edges can be inserted on any tick because we are fixpointing */));
14        let spin = use(spinner, nondet!(/** force infinite loop for fixpoint */));
15
16        reached = reached.chain(new_roots);
17        let reachable = reached
18            .clone()
19            .map(q!(|r| (r, ())))
20            .join(current_edges.flatten_ordered())
21            .map(q!(|(_from, (_, to))| to));
22
23        reached = reached.chain(reachable);
24
25        reached.clone().cross_singleton(spin.count()).map(q!(|(v, _)| v)) // spin must be used to force infinite loop
26    }.unique()
27}
28
29#[cfg(test)]
30mod tests {
31    use futures::{SinkExt, StreamExt};
32    use hydro_deploy::Deployment;
33    use hydro_lang::location::Location;
34
35    #[tokio::test]
36    async fn test_reachability() {
37        let mut deployment = Deployment::new();
38
39        let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
40        let external = builder.external::<()>();
41        let p1 = builder.process();
42
43        let (roots_send, roots) = p1.source_external_bincode(&external);
44        let (edges_send, edges) = p1.source_external_bincode(&external);
45        let out = super::graph_reachability(roots, edges);
46        let out_recv = out.send_bincode_external(&external);
47
48        let mut built = builder.with_default_optimize();
49
50        println!(
51            "{}",
52            built
53                .preview_compile()
54                .dfir_for(&p1)
55                .surface_syntax_string()
56        );
57
58        let nodes = built
59            .with_process(&p1, deployment.Localhost())
60            .with_external(&external, deployment.Localhost())
61            .deploy(&mut deployment);
62
63        deployment.deploy().await.unwrap();
64
65        let mut roots_send = nodes.connect(roots_send).await;
66        let mut edges_send = nodes.connect(edges_send).await;
67        let out_recv = nodes.connect(out_recv).await;
68
69        deployment.start().await.unwrap();
70
71        roots_send.send(1).await.unwrap();
72        roots_send.send(2).await.unwrap();
73
74        edges_send.send((1, 2)).await.unwrap();
75        edges_send.send((2, 3)).await.unwrap();
76        edges_send.send((3, 4)).await.unwrap();
77        edges_send.send((4, 5)).await.unwrap();
78
79        assert_eq!(out_recv.take(5).collect::<Vec<_>>().await, &[1, 2, 3, 4, 5]);
80    }
81}