Skip to main content

Your First Dataflow

Let's look a minimal example of a Hydroflow+ program. We'll start with a simple dataflow that prints out the first 10 natural numbers.

tip

We recommend using the Hydroflow+ template to get started with a new project:

cargo install cargo-generate
cargo generate gh:hydro-project/hydroflow template/hydroflow_plus

Writing a Dataflow

In Hydroflow+, streams are attached to a location, which is either a virtual handle to a single machine (the Process type) or set of machines (the Cluster type). A single piece of Hydroflow+ code can describe a distributed program that runs across multiple processes and clusters, each with their own local state and data.

We'll write our first dataflow in src/first_ten.rs. This program will run on a single machine, so we take a single &Process parameter. We can materialize a stream on this machine using process.source_iter (which emits values from a static in-memory collection), and then print out the values using for_each.

src/first_ten.rs
use hydroflow_plus::*;

pub fn first_ten(process: &Process) {
process
.source_iter(q!(0..10))
.for_each(q!(|n| println!("{}", n)));
}
caution

You'll notice that the arguments to source_iter and for_each are wrapped in q! macros. The top-level Hydroflow+ program (first_ten) is responsible for setting up the dataflow structure, whereas the q! macro is used to mark the Rust code that will be executed at runtime. Generally, runtime code in a q! macro is a snippet of Rust code that defines a static source of data or a closure.

If you forget to wrap a block in q! when that is required, you'll see an error like:

closure is expected to take 5 arguments, but it takes X arguments

Running the Dataflow

To run a Hydroflow+ program, we need to write some deployment configuration in examples/first_ten.rs.

tip

When using Hydroflow+, we will always place our deployment scripts in the examples directory. This is required because deployment is done via Hydro Deploy which is a dev dependency---i.e. not part of the dependencies used for generating binaries (but available to programs in the examples directory).

examples/first_ten.rs
use hydro_deploy::Deployment;

#[tokio::main]
async fn main() {
let mut deployment = Deployment::new();

let flow = hydroflow_plus::FlowBuilder::new();
let process = flow.process();
hydroflow_plus_template::first_ten::first_ten(&process);

let _nodes = flow
.with_process(&process, deployment.Localhost())
.deploy(&mut deployment);

deployment.run_ctrl_c().await.unwrap();
}

First, we initialize a new Hydro Deploy deployment with Deployment::new(). Then, we create a FlowBuilder which will store the entire dataflow program and manage its compilation.

To create a Process, we call flow.process(). After the dataflow has been created (by invoking the hydroflow_plus_template::first_ten::first_ten function we created earlier), we must map each instantiated Process to a deployment target using flow.with_process (in this case we deploy to localhost).

Finally, we call flow.deploy(&mut deployment) to provision the dataflow program on the target machine. This returns a struct with handles to the instantiated machines, which we must store in the _nodes variable to prevent them from being dropped. Then, we can start the dataflow program and block until Ctrl-C using deployment.run_ctrl_c().

We can then launch the program using the following command:

cargo run --example first_ten
[() (process 0)] 0
[() (process 0)] 1
[() (process 0)] 2
[() (process 0)] 3
[() (process 0)] 4
[() (process 0)] 5
[() (process 0)] 6
[() (process 0)] 7
[() (process 0)] 8
[() (process 0)] 9

In the next section, we will look at how to distribute this program across multiple processes.