Skip to main content

Your First Dataflow

Hydroflow+ programs require special structure to support code generation and distributed deployments. There are three main components of a Hydroflow+ program:

  • The flow graph describes the dataflow logic of the program.
  • The runtime wraps the dataflow in an executable Rust binary.
  • The deployment describes how to map the flow graph to instances of the runtime. This is only needed for distributed deployments.

We recommend using the Hydroflow+ template to get started with a new project. The template comes with a pre-configured build system and the following example pre-implemented.

cargo install cargo-generate
cargo generate hydro-project/hydroflow-plus-template

Let's look a minimal example of a Hydroflow+ program. We'll start with a simple flow graph that prints out the first 10 natural numbers. First, we'll define the flow graph.

The Flow Graph

use hydroflow_plus::*;
use stageleft::*;

pub fn first_ten<'a, D: LocalDeploy<'a>>(
flow: &FlowBuilder<'a, D>,
process_spec: &impl ProcessSpec<'a, D>
) {}

To build a Hydroflow+ application, we need to define a dataflow that spans multiple processes. The FlowBuilder parameter captures the global dataflow, while the process_spec variable defines how to construct the processes where the dataflow will be executed. For now, we will only use the ProcessSpec once, to add a single process to our global dataflow.

pub fn first_ten<'a, D: LocalDeploy<'a>>(
flow: &FlowBuilder<'a, D>,
process_spec: &impl ProcessSpec<'a, D>
) {
let process = flow.process(process_spec);

Now, we can build out the dataflow to run on this process. Every dataflow starts at a source that is bound to a specific process. First, we instantiate a stream that emits the first 10 natural numbers.

let numbers = flow.source_iter(&process, q!(0..10)); // : Stream<_, i32, _, _>

In Hydroflow+, whenever there are snippets of Rust code passed to operators (like source_iter, map, or for_each), we use the q! macro to mark them. For example, we may use Rust snippets to define static sources of data or closures that transform them.

To print out these numbers, we can use the for_each operator (note that the body of for_each is a closure wrapped in q!):

numbers.for_each(q!(|n| println!("{}", n)));

The Runtime

Next, we need to instantiate our dataflow into a runnable Rust binary. We do this by defining a Stageleft entrypoint for the graph, and then invoking the entrypoint inside a separate Rust binary.

To define the entrypoint, we use the #[stageleft::entry] macro, which takes the graph being built and returns a generated Hydroflow program. We define this as a new function first_ten_runtime. In this first example, we assume there is a single process so that we do not need to specify how the graph is laid out across processes. We specify this by using the SingleProcessGraph type parameter on FlowBuilder.

Having done that, we can use some simple defaults for "distributing" this single-process deployment. First, we use () as an argument to first_ten to choose the default process spec. Then we use the optimize_default method to generate the Hydroflow program with default optimizations.

pub fn first_ten_runtime<'a>(
flow: FlowBuilder<'a, SingleProcessGraph>
) -> impl Quoted<'a, Hydroflow<'a>> {
first_ten(&flow, &() /* for a single process graph */);
flow.extract().optimize_default() // : impl Quoted<'a, Hydroflow<'a>>

Finally, it's time to write our main function. Stageleft entries are usable as macros from other programs. In our case, we will instantiate our entrypoint from the Rust binary for our dataflow. We can create a new file src/bin/ with the following contents. Note that Hydroflow+ requires that we use tokio and its async function specification:

async fn main() {

We can now run this binary to see the output of our dataflow:

cargo run -p flow --bin first_ten

In the next section, we will look at how to extend this program to run on multiple processs.