Your First Dataflow
Let's look a minimal example of a Hydroflow+ program. We'll start with a simple dataflow that prints out the first 10 natural numbers.
We recommend using the Hydroflow+ template to get started with a new project:
cargo install cargo-generate
cargo generate gh:hydro-project/hydroflow template/hydroflow_plus
Writing a Dataflow
In Hydroflow+, streams are attached to a location, which is either a virtual handle to a single machine (the Process
type) or set of machines (the Cluster
type). A single piece of Hydroflow+ code can describe a distributed program that runs across multiple processes and clusters, each with their own local state and data.
We'll write our first dataflow in src/first_ten.rs
. This program will run on a single machine, so we take a single &Process
parameter. We can materialize a stream on this machine using process.source_iter
(which emits values from a static in-memory collection), and then print out the values using for_each
.
use hydroflow_plus::*;
pub fn first_ten(process: &Process) {
process
.source_iter(q!(0..10))
.for_each(q!(|n| println!("{}", n)));
}
You'll notice that the arguments to source_iter
and for_each
are wrapped in q!
macros. The top-level Hydroflow+ program (first_ten
) is responsible for setting up the dataflow structure, whereas the q!
macro is used to mark the Rust code that will be executed at runtime. Generally, runtime code in a q!
macro is a snippet of Rust code that defines a static source of data or a closure.
If you forget to wrap a block in q!
when that is required, you'll see an error like:
closure is expected to take 5 arguments, but it takes X arguments
Running the Dataflow
To run a Hydroflow+ program, we need to write some deployment configuration in examples/first_ten.rs
.
When using Hydroflow+, we will always place our deployment scripts in the examples
directory. This is required because deployment is done via Hydro Deploy which is a dev dependency---i.e. not part of the dependencies used for generating binaries (but available to programs in the examples
directory).
use hydro_deploy::Deployment;
#[tokio::main]
async fn main() {
let mut deployment = Deployment::new();
let flow = hydroflow_plus::FlowBuilder::new();
let process = flow.process();
hydroflow_plus_template::first_ten::first_ten(&process);
let _nodes = flow
.with_process(&process, deployment.Localhost())
.deploy(&mut deployment);
deployment.run_ctrl_c().await.unwrap();
}
First, we initialize a new Hydro Deploy deployment with Deployment::new()
. Then, we create a FlowBuilder
which will store the entire dataflow program and manage its compilation.
To create a Process
, we call flow.process()
. After the dataflow has been created (by invoking the hydroflow_plus_template::first_ten::first_ten
function we created earlier), we must map each instantiated Process
to a deployment target using flow.with_process
(in this case we deploy to localhost).
Finally, we call flow.deploy(&mut deployment)
to provision the dataflow program on the target machine. This returns a struct with handles to the instantiated machines, which we must store in the _nodes
variable to prevent them from being dropped. Then, we can start the dataflow program and block until Ctrl-C
using deployment.run_ctrl_c()
.
We can then launch the program using the following command:
cargo run --example first_ten
[() (process 0)] 0
[() (process 0)] 1
[() (process 0)] 2
[() (process 0)] 3
[() (process 0)] 4
[() (process 0)] 5
[() (process 0)] 6
[() (process 0)] 7
[() (process 0)] 8
[() (process 0)] 9
In the next section, we will look at how to distribute this program across multiple processes.