Skip to main content

Graph Un-Reachability

In this example we cover:

  • Extending a program with additional downstream logic.
  • Hydroflow's (difference) operator
  • A first exposure to the concepts of strata and ticks
  • An example of how a cyclic dataflow in one stratum executes to completion before starting the next stratum.

Our next example builds on the previous by finding vertices that are not reachable. To do this, we need to capture the set all_vertices, and use a difference operator to form the difference between that set of vertices and reachable_vertices.

Essentially we want a flow like this:

This is a simple augmentation of our previous example. Replace the contents of src/ with the following:

use hydroflow::hydroflow_syntax;

pub fn main() {
// An edge in the input data = a pair of `usize` vertex IDs.
let (pairs_send, pairs_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>();

let mut flow = hydroflow_syntax! {
// inputs: the origin vertex (vertex 0) and stream of input edges
origin = source_iter(vec![0]);
stream_of_edges = source_stream(pairs_recv) -> tee();

// the join for reachable vertices
reached_vertices[0] -> map(|v| (v, ())) -> [0]my_join;
stream_of_edges[1] -> [1]my_join;
my_join = join() -> flat_map(|(src, ((), dst))| [src, dst]);

// the cycle: my_join gets data from reached_vertices
// and provides data back to reached_vertices!
origin -> [base]reached_vertices;
my_join -> [cycle]reached_vertices;
reached_vertices = union()->tee();

// the difference: all_vertices - reached_vertices
all_vertices = stream_of_edges[0]
-> flat_map(|(src, dst)| [src, dst]) -> tee();
all_vertices[0] -> [pos]unreached_vertices;
reached_vertices[1] -> [neg]unreached_vertices;
unreached_vertices = difference();

// the output
all_vertices[1] -> unique() -> for_each(|v| println!("Received vertex: {}", v));
unreached_vertices -> for_each(|v| println!("unreached_vertices vertex: {}", v));

.expect("No graph found, maybe failed to parse.")

pairs_send.send((5, 10)).unwrap();
pairs_send.send((0, 3)).unwrap();
pairs_send.send((3, 6)).unwrap();
pairs_send.send((6, 5)).unwrap();
pairs_send.send((11, 12)).unwrap();

Notice that we are now sending in some new pairs to test this code. The output should be:

cargo run
<build output>
<graph output>
Received vertex: 5
Received vertex: 10
Received vertex: 0
Received vertex: 3
Received vertex: 6
Received vertex: 11
Received vertex: 12
unreached_vertices vertex: 11
unreached_vertices vertex: 12

Let's review the changes, all of which come at the end of the program. First, we remove code to print reached_vertices. Then we define all_vertices to be the vertices that appear in any edge (using familiar flat_map code from the previous examples.) In the last few lines, we wire up a difference operator to compute the difference between all_vertices and reached_vertices; note how we wire up the pos and neg inputs to the difference operator! Finally we print both all_vertices and unreached_vertices.

The auto-generated mermaid looks like so:

Strata and Ticks‚Äč

Notice in the mermaid graph how Hydroflow separates the difference operator and its downstream dependencies into its own stratum (plural: strata). Note also the edge coming into the neg input to difference is bold and ends in a ball: this is because that input to difference is "blocking", meaning that difference should not run until all of the input on that edge has been received. The stratum boundary before difference ensures that the blocking property is respected.

Hydroflow runs each stratum in order, one at a time, ensuring all values are computed before moving on to the next stratum. Between strata we see a handoff, which logically buffers the output of the first stratum, and delineates the separation of execution between the 2 strata.

If you look carefully, you'll see two subgraphs labeled with stratum 0. The reason that stratum 0 was broken into subgraphs has nothing to do with correctness, but rather the way that Hydroflow graphs are compiled and scheduled (as discussed in the section on In-Out Trees. We need not concern ourselves with this detail other than to look carefully at the stratum labels on the grey boxes in our Mermaid diagrams.

All the subgraphs labeled stratum 0 are run first to completion, and then all the subgraphs labeled stratum 1 are run. This captures the requirements of the difference operator: it has to wait for its full negative input before it can start producing output. Note how the difference operator has two inputs (labeled pos and neg), and only the neg input shows up as blocking (with the bold edge ending in a ball).

In this Mermaid graph, note that stratum 0 has a recursive loop back through my_join, and tees off output to the difference operator in stratum 1 via the handoff and the blocking neg input. This means that Hydroflow will first run the loop of stratum 0 repeatedly until all the transitive reached vertices are passed to the handoff (a fixpoint), before moving on to compute the unreached vertices via stratum 1.

After all strata are run, Hydroflow returns to stratum 0; this begins the next tick. This doesn't really matter for this example, but it is important for long-running Hydroflow services that accept input from the outside world. More on this topic in the chapter on time.