hydro_test_local_macro/local/
graph_reachability.rs

1use dfir_rs::tokio::sync::mpsc::UnboundedSender;
2use dfir_rs::tokio_stream::wrappers::UnboundedReceiverStream;
3use hydro_lang::deploy::SingleProcessGraph;
4use hydro_lang::dfir_rs::scheduled::graph::Dfir;
5use hydro_lang::*;
6use stageleft::{Quoted, RuntimeData};
7
8#[stageleft::entry]
9pub fn graph_reachability<'a>(
10    flow: FlowBuilder<'a>,
11    roots: RuntimeData<UnboundedReceiverStream<u32>>,
12    edges: RuntimeData<UnboundedReceiverStream<(u32, u32)>>,
13    reached_out: RuntimeData<&'a UnboundedSender<u32>>,
14) -> impl Quoted<'a, Dfir<'a>> {
15    let process = flow.process::<()>();
16
17    let roots = process.source_stream(roots);
18    let edges = process.source_stream(edges);
19
20    let reachability_tick = process.tick();
21    let (set_reached_cycle, reached_cycle) = reachability_tick.cycle::<Stream<_, _, _, NoOrder>>();
22
23    let reached = unsafe {
24        // SAFETY: roots can be inserted on any tick because we are fixpointing
25        roots.tick_batch(&reachability_tick).chain(reached_cycle)
26    };
27    let reachable = reached
28        .clone()
29        .map(q!(|r| (r, ())))
30        .join(unsafe {
31            // SAFETY: edges can be inserted on any tick because we are fixpointing
32            edges.tick_batch(&reachability_tick).persist()
33        })
34        .map(q!(|(_from, (_, to))| to));
35    set_reached_cycle.complete_next_tick(reached.clone().chain(reachable));
36
37    reached.all_ticks().unique().for_each(q!(|v| {
38        reached_out.send(v).unwrap();
39    }));
40
41    flow.compile_no_network::<SingleProcessGraph>()
42}
43
44#[cfg(stageleft_runtime)]
45#[cfg(test)]
46mod tests {
47    use dfir_rs::assert_graphvis_snapshots;
48    use dfir_rs::util::collect_ready;
49
50    #[test]
51    pub fn test_reachability() {
52        let (roots_send, roots) = dfir_rs::util::unbounded_channel();
53        let (edges_send, edges) = dfir_rs::util::unbounded_channel();
54        let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
55
56        let mut reachability = super::graph_reachability!(roots, edges, &out);
57        assert_graphvis_snapshots!(reachability);
58
59        roots_send.send(1).unwrap();
60        roots_send.send(2).unwrap();
61
62        edges_send.send((1, 2)).unwrap();
63        edges_send.send((2, 3)).unwrap();
64        edges_send.send((3, 4)).unwrap();
65        edges_send.send((4, 5)).unwrap();
66
67        reachability.run_tick();
68        reachability.run_tick();
69        reachability.run_tick();
70        reachability.run_tick();
71
72        assert_eq!(
73            &*collect_ready::<Vec<_>, _>(&mut out_recv),
74            &[1, 2, 3, 4, 5]
75        );
76    }
77}