Skip to main content

Graph Neighbors

In this example we cover:

  • Assigning sub-flows to variables
  • Our first multi-input operator, join
  • Indexing multi-input operators by prepending a bracket expression
  • The unique operator for removing duplicates from a stream
  • Visualizing DFIR code via flow.meta_graph().expect(...).to_mermaid()

So far, all the operators we've used have one input and one output and therefore create a linear flow of operators. Let's now take a look at a DFIR program containing an operator which has multiple inputs; in the following examples we'll extend this to multiple outputs.

To motivate this, we are going to start out on a little project of building a flow-based algorithm for the problem of graph reachability. Given an abstract graph—represented as data in the form of a streaming list of edges—which vertices can be reached from a vertex passed in as the origin? It turns out this is fairly naturally represented as a dataflow program.

Note on terminology: In each of the next few examples, we're going to write a DFIR program (a dataflow graph) to process data that itself represents some other graph! To avoid confusion, in these examples, we'll refer to the DFIR program as a "flow" or "program", and the data as a "graph" of "edges" and "vertices".

But First: Graph Neighbors

Graph reachability exercises a bunch of concepts at once, so we'll start here with a simpler flow that finds graph neighbors: vertices that are just one hop away.

Our graph neighbors DFIR program will take our initial origin vertex as one input, and join it another input that streams in all the edges—this join will stream out the vertices that are one hop (edge) away from the starting vertex.

Here is an intuitive diagram of that dataflow program (we'll see complete, autogenerated DFIR diagrams below):

Lets take a look at some DFIR code that implements the program. In your simple project, replace the contents of src/main.rs with the following:

use dfir_rs::dfir_syntax;

pub fn main() {
// An edge in the input data = a pair of `usize` vertex IDs.
let (edges_send, edges_recv) = dfir_rs::util::unbounded_channel::<(usize, usize)>();

let mut flow = dfir_syntax! {
// inputs: the origin vertex (vertex 0) and stream of input edges
origin = source_iter(vec![0]);
stream_of_edges = source_stream(edges_recv);

// the join
origin -> map(|v| (v, ())) -> [0]my_join;
stream_of_edges -> [1]my_join;
my_join = join() -> flat_map(|(src, (_, dst))| [src, dst]);

// the output
my_join -> unique() -> for_each(|n| println!("Reached: {}", n));
};

println!(
"{}",
flow.meta_graph()
.expect("No graph found, maybe failed to parse.")
.to_mermaid(&Default::default())
);
edges_send.send((0, 1)).unwrap();
edges_send.send((2, 4)).unwrap();
edges_send.send((3, 4)).unwrap();
edges_send.send((1, 2)).unwrap();
edges_send.send((0, 3)).unwrap();
edges_send.send((0, 3)).unwrap();
flow.run_available();
}

Run the program and focus on the last three lines of output, which come from flow.run_available():

cargo run
<build output>
<graph output>
Reached: 0
Reached: 1
Reached: 3

That looks right: the edges we "sent" into the flow that start at 0 are (0, 1) and (0, 3), so the nodes reachable from 0 in 0 or 1 hops are 0, 1, 3.

Note: When you run the program you may see the lines printed out in a different order. That's OK; the flow we're defining here uses the join() operator, which deals in sets of data items, so the order in which a join()'s output items are generated is not specified or guaranteed. The sort_by_key operator can always be used to sort the output of a flow if needed.

Examining the DFIR Code

In the code, we want to start out with the origin vertex, 0, and the stream of edges coming in. Because this flow is a bit more complex than our earlier examples, we break it down into named "subflows", assigning them variable names that we can reuse. Here we specify two subflows, origin and stream_of_edges:

// inputs: the origin vertex (vertex 0) and stream of input edges
origin = source_iter(vec![0]);
stream_of_edges = source_stream(edges_recv);

The Rust syntax vec![0] constructs a vector with a single element, 0, which we iterate over using source_iter.

We then set up a join() that we name my_join, which acts like a SQL inner join.

// the join
origin -> map(|v| (v, ())) -> [0]my_join;
stream_of_edges -> [1]my_join;
my_join = join() -> flat_map(|(src, (_, dst))| [src, dst]);

First, note the syntax for passing data into a subflow with multiple inputs requires us to prepend an input index (starting at 0) in square brackets to the multi-input variable name or operator. In this example we have -> [0]my_join and -> [1]my_join.

DFIR's join() API requires a little massaging of its inputs to work properly. The inputs must be of the form of a pair of elements (K, V1) and (K, V2), and the operator joins them on equal keys K and produces an output of (K, (V1, V2)) elements. In this case we only want to join on the key v and don't have any corresponding value, so we feed origin through a map() to generate (v, ()) elements as the first join input.

The stream_of_edges are (src, dst) pairs, so the join's output is (src, ((), dst)) where dst values are new neighbor vertices. So the my_join variable feeds the output of the join through a flat_map to extract the pairs into 2-item arrays, which are flattened to give us a list of all vertices reached.

Note that the order of the statements (lines) doesn't matter. In this example, my_join is referenced before it is assigned, and that is completely OK and makes the code more readable.

Finally we print the neighbor vertices as follows:

// the output
my_join -> unique() -> for_each(|n| println!("Reached: {}", n));

The unique operator removes duplicates from the stream to make things more readable. Note that unique runs in a streaming fashion, which we will talk about more later.

There's also some extra code here, flow.meta_graph().expect(...).to_mermaid(), which tells DFIR to generate a diagram rendered by Mermaid showing the structure of the graph, and print it to stdout. You can copy that text and paste it into the Mermaid Live Editor to see the graph, which should look as follows:

You may be wondering why the nodes in the graph have different colors (and shapes, for readers who cannot distinguish colors easily). The answer has nothing to do with the meaning of the program, only with the way that DFIR compiles operators into Rust. Simply put, blue (wide-topped) boxes pull data, yellow (wide-bottomed) boxes push data, and the handoff is a special operator that buffers pushed data for subsequent pulling. DFIR always places a handoff between a push producer and a pull consumer, for reasons explained in the Architecture chapter.

Returning to the code, if you read the edges_send calls carefully, you'll see that the example data has vertices (2, 4) that are more than one hop away from 0, which were not output by our simple program. To extend this example to graph reachability, we need to recurse: find neighbors of our neighbors, neighbors of our neighbors' neighbors, and so on. In DFIR, this is done by adding a loop to the flow, as we'll see in our next example.