Skip to main content

Flow Syntax

Flows consist of named operators that are connected via flow edges denoted by ->. The example below uses the source_iter operator to generate two strings from a Rust vec, the map operator to apply some Rust code to uppercase each string, and the for_each operator to print each string to stdout.

source_iter(vec!["Hello", "world"])
-> map(|x| x.to_uppercase()) -> for_each(|x| println!("{}", x));

Flows can be assigned to variable names for convenience. E.g, the above can be rewritten as follows:

source_iter(vec!["Hello", "world"]) -> upper_print;
upper_print = map(|x| x.to_uppercase()) -> for_each(|x| println!("{}", x));

Note that the order of the statements (lines) doesn't matter. In this example, upper_print is referenced before it is assigned, and that is completely OK and better matches the flow of data, making the program more understandable.

Operators with Multiple Ports

Some operators have more than one input port that can be referenced by ->. For example union unions the contents of many flows, so it can have an abitrary number of input ports. Some operators have multiple outputs; tee and demux have an arbitrary number of outputs.

In the syntax, we optionally distinguish input ports via an indexing prefix string in square brackets before the name (e.g. [0]my_union and [1]my_union). Most operators with a fixed number of input ports\ require specific indexing prefixes to distinguish the inputs. For example, the inputs to join must be [0] and [1]; the inputs to difference must be [pos] and [neg]. Operators with an arbitrary number of inputs (union) and outputs (demux, tee) allow you to choose arbitrary strings, which help you make your code and dataflow graphs more readable and understandable (e.g. my_tee[print] and my_tee[continue]).

Here is an example that tees one flow into two, handles each separately, and then unions them to print out the contents in both lowercase and uppercase:

my_tee = source_iter(vec!["Hello", "world"]) -> tee();
my_tee -> map(|x| x.to_uppercase()) -> [low_road]my_union;
my_tee -> map(|x| x.to_lowercase()) -> [high_road]my_union;
my_union = union() -> for_each(|x| println!("{}", x));

Here is a visualization of the flow that was generated. Note that the outbound labels to my_tee were auto-generated, but the inbound labels to my_union were specified by the code above:

DFIR compiled this flow into two subgraphs called compiled components, connected by handoffs. You can ignore these details unless you are interested in low-level performance tuning; they are explained in the discussion of in-out trees.

The context object

Closures inside surface syntax operators have access to a special context object which provides access to scheduling, timing, and state APIs. The object is accessible as a shared reference (&Context) via the special name context. Here is the full API documentation for Context.

source_iter([()])
-> for_each(|()| println!("Current tick: {}, stratum: {}", context.current_tick(), context.current_stratum()));

Output:

Current tick: [0], stratum: 0