hydro_test_local/local/
graph_reachability.rs
1use dfir_rs::tokio::sync::mpsc::UnboundedSender;
2use dfir_rs::tokio_stream::wrappers::UnboundedReceiverStream;
3use hydro_lang::deploy::SingleProcessGraph;
4use hydro_lang::dfir_rs::scheduled::graph::Dfir;
5use hydro_lang::*;
6use stageleft::{Quoted, RuntimeData};
7
8#[stageleft::entry]
9pub fn graph_reachability<'a>(
10 flow: FlowBuilder<'a>,
11 roots: RuntimeData<UnboundedReceiverStream<u32>>,
12 edges: RuntimeData<UnboundedReceiverStream<(u32, u32)>>,
13 reached_out: RuntimeData<&'a UnboundedSender<u32>>,
14) -> impl Quoted<'a, Dfir<'a>> {
15 let process = flow.process::<()>();
16
17 let roots = process.source_stream(roots);
18 let edges = process.source_stream(edges);
19
20 let reachability_tick = process.tick();
21 let (set_reached_cycle, reached_cycle) = reachability_tick.cycle::<Stream<_, _, _, NoOrder>>();
22
23 let reached = unsafe {
24 roots.tick_batch(&reachability_tick).chain(reached_cycle)
26 };
27 let reachable = reached
28 .clone()
29 .map(q!(|r| (r, ())))
30 .join(unsafe {
31 edges.tick_batch(&reachability_tick).persist()
33 })
34 .map(q!(|(_from, (_, to))| to));
35 set_reached_cycle.complete_next_tick(reached.clone().chain(reachable));
36
37 reached.all_ticks().unique().for_each(q!(|v| {
38 reached_out.send(v).unwrap();
39 }));
40
41 flow.compile_no_network::<SingleProcessGraph>()
42}
43
44#[cfg(stageleft_runtime)]
45#[cfg(test)]
46mod tests {
47 use dfir_rs::assert_graphvis_snapshots;
48 use dfir_rs::util::collect_ready;
49
50 #[test]
51 pub fn test_reachability() {
52 let (roots_send, roots) = dfir_rs::util::unbounded_channel();
53 let (edges_send, edges) = dfir_rs::util::unbounded_channel();
54 let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
55
56 let mut reachability = super::graph_reachability!(roots, edges, &out);
57 assert_graphvis_snapshots!(reachability);
58
59 roots_send.send(1).unwrap();
60 roots_send.send(2).unwrap();
61
62 edges_send.send((1, 2)).unwrap();
63 edges_send.send((2, 3)).unwrap();
64 edges_send.send((3, 4)).unwrap();
65 edges_send.send((4, 5)).unwrap();
66
67 reachability.run_tick();
68 reachability.run_tick();
69 reachability.run_tick();
70 reachability.run_tick();
71
72 assert_eq!(
73 &*collect_ready::<Vec<_>, _>(&mut out_recv),
74 &[1, 2, 3, 4, 5]
75 );
76 }
77}