Graph Un-Reachability
In this example we cover:
- Extending a program with additional downstream logic.
- DFIR's (
difference
) operator- A first exposure to the concepts of strata and ticks
- An example of how a cyclic dataflow in one stratum executes to completion before starting the next stratum.
Our next example builds on the previous by finding vertices that are not reachable. To do this, we need to capture the set all_vertices
, and use a difference operator to form the difference between that set of vertices and reachable_vertices
.
Essentially we want a flow like this:
This is a simple augmentation of our previous example. Replace the contents of src/main.rs
with the following:
use dfir_rs::dfir_syntax;
pub fn main() {
// An edge in the input data = a pair of `usize` vertex IDs.
let (pairs_send, pairs_recv) = dfir_rs::util::unbounded_channel::<(usize, usize)>();
let mut flow = dfir_syntax! {
// inputs: the origin vertex (vertex 0) and stream of input edges
origin = source_iter(vec![0]);
stream_of_edges = source_stream(pairs_recv) -> tee();
// the join for reachable vertices
reached_vertices[0] -> map(|v| (v, ())) -> [0]my_join;
stream_of_edges[1] -> [1]my_join;
my_join = join() -> flat_map(|(src, ((), dst))| [src, dst]);
// the cycle: my_join gets data from reached_vertices
// and provides data back to reached_vertices!
origin -> [base]reached_vertices;
my_join -> [cycle]reached_vertices;
reached_vertices = union()->tee();
// the difference: all_vertices - reached_vertices
all_vertices = stream_of_edges[0]
-> flat_map(|(src, dst)| [src, dst]) -> tee();
all_vertices[0] -> [pos]unreached_vertices;
reached_vertices[1] -> [neg]unreached_vertices;
unreached_vertices = difference();
// the output
all_vertices[1] -> unique() -> for_each(|v| println!("Received vertex: {}", v));
unreached_vertices -> for_each(|v| println!("unreached_vertices vertex: {}", v));
};
println!(
"{}",
flow.meta_graph()
.expect("No graph found, maybe failed to parse.")
.to_mermaid(&Default::default())
);
pairs_send.send((5, 10)).unwrap();
pairs_send.send((0, 3)).unwrap();
pairs_send.send((3, 6)).unwrap();
pairs_send.send((6, 5)).unwrap();
pairs_send.send((11, 12)).unwrap();
flow.run_available();
}
Notice that we are now sending in some new pairs to test this code. The output should be:
cargo run
<build output>
<graph output>
Received vertex: 5
Received vertex: 10
Received vertex: 0
Received vertex: 3
Received vertex: 6
Received vertex: 11
Received vertex: 12
unreached_vertices vertex: 11
unreached_vertices vertex: 12
Let's review the changes, all of which come at the end of the program. First,
we remove code to print reached_vertices
. Then we define all_vertices
to be
the vertices that appear in any edge (using familiar flat_map
code from the previous
examples.) In the last few lines, we wire up a
difference operator
to compute the difference between all_vertices
and reached_vertices
; note
how we wire up the pos
and neg
inputs to the difference
operator!
Finally we print both all_vertices
and unreached_vertices
.
The auto-generated mermaid looks like so:
Strata and Ticks
Notice in the mermaid graph how DFIR separates the difference
operator and its downstream dependencies into its own
stratum (plural: strata). Note also the edge coming into the neg
input to difference
is bold and ends in a ball: this is because that input to difference
is
"blocking", meaning that difference
should not run until all of the input on that edge has been received.
The stratum boundary before difference
ensures that the blocking property is respected.
DFIR runs each stratum in order, one at a time, ensuring all values are computed before moving on to the next stratum. Between strata we see a handoff, which logically buffers the output of the first stratum, and delineates the separation of execution between the 2 strata.
If you look carefully, you'll see two subgraphs labeled with stratum 0
. The reason that stratum 0 was broken into subgraphs has nothing to do with
correctness, but rather the way that DFIR graphs are compiled and scheduled (as
discussed in the section on In-Out Trees. We need not concern ourselves with this detail other than to look carefully at the stratum
labels on the grey boxes in our Mermaid diagrams.
All the subgraphs labeled stratum 0
are run first to completion,
and then all the subgraphs labeled stratum 1
are run. This captures the requirements of the difference
operator: it has to wait for its full negative input before it can start producing output. Note
how the difference
operator has two inputs (labeled pos
and neg
), and only the neg
input shows up as blocking (with the bold edge ending in a ball).
In this Mermaid graph, note that stratum 0 has a recursive loop back through my_join
, and tee
s off output to the difference
operator in stratum 1 via the handoff and the blocking neg
input. This means that DFIR will first run the loop of stratum 0 repeatedly until all the transitive reached vertices are passed to the handoff (a fixpoint), before moving on to compute the unreached vertices via stratum 1.
After all strata are run, DFIR returns to stratum 0; this begins the next tick. This doesn't really matter for this example, but it is important for long-running DFIR services that accept input from the outside world. More on this topic in the chapter on time.