Skip to main content

Adding Distribution

Continuing from our previous example, we will now look at how to deploy our program to run on two processes.

We'll start by updating our dataflow function signature to take two processes (in a new file, src/first_ten_distributed.rs). At this point, we'll need to add a lifetime parameter 'a which represents the lifetime of data referenced by our dataflow logic. This lifetime needs to be the same across all the processes, so it can't be elided.

src/first_ten_distributed.rs
use hydro::*;

pub fn first_ten_distributed<'a>(p1: &Process<'a>, p2: &Process<'a>)
info

The Hydro template only contains the final version of this program. In order to follow along with the tutorial, we recommend overwriting src/first_ten_distributed.rs according to the following snippets.

Now, we'll use a new API, send_bincode to establish a network between our processes (bincode is the serialization format we are using). Given a stream on process p1, we can send the data to p2 by calling .send_bincode(p2), which returns a stream on p2. So to make our program distributed, it only takes a single line change.

src/first_ten_distributed.rs
pub fn first_ten_distributed<'a>(p1: &Process<'a>, p2: &Process<'a>) {
p1.source_iter(q!(0..10))
.send_bincode(p2)
.for_each(q!(|n| println!("{}", n)));
}

Then, we can update our deployment script to launch both processes on localhost. Hydro Deploy will automatically handle service discovery and networking, since it knows the full network topology.

examples/first_ten_distributed.rs
use hydro_deploy::Deployment;

#[tokio::main]
async fn main() {
let mut deployment = Deployment::new();

let flow = hydro_lang::FlowBuilder::new();
let p1 = flow.process();
let p2 = flow.process();
hydro_template::first_ten_distributed::first_ten_distributed(&p1, &p2);

let _nodes = flow
.with_process(&p1, deployment.Localhost())
.with_process(&p2, deployment.Localhost())
.deploy(&mut deployment);

deployment.run_ctrl_c().await.unwrap();
}

We can then launch the program:

cargo run --example first_ten_distributed
[() (process 1)] 0
[() (process 1)] 1
[() (process 1)] 2
[() (process 1)] 3
[() (process 1)] 4
[() (process 1)] 5
[() (process 1)] 6
[() (process 1)] 7
[() (process 1)] 8
[() (process 1)] 9

You'll notice that our logs are not particularly descriptive, just showing () as an identifier. Furthermore, our processes have the same Rust type, which could lead to accidentally mixing up streams across the machines (this will throw an exception, but it would be nice to have a compile error).

To fix this, we can use the optional type parameter on Process, which lets us add a "type tag" that acts as an identifier. We'll define two structs to act as these tags and use them in the function signature:

src/first_ten_distributed.rs
pub struct P1 {}
pub struct P2 {}

pub fn first_ten_distributed<'a>(p1: &Process<'a, P1>, p2: &Process<'a, P2>) {
p1.source_iter(q!(0..10)) // : Stream<i32, Process<P1>, ...>
.send_bincode(p2) // : Stream<i32, Process<P2>, ...>
.for_each(q!(|n| println!("{}", n)));
}
info

This is the final version of our dataflow which you will find in the Hydro template.

If you are using an IDE extension like Rust Analyzer, you'll see these types attached to each stream. And if we launch the program again, we'll see much better logs:

cargo run --example first_ten_distributed
[first_ten_distributed::P2 (process 1)] 0
[first_ten_distributed::P2 (process 1)] 1
[first_ten_distributed::P2 (process 1)] 2
[first_ten_distributed::P2 (process 1)] 3
[first_ten_distributed::P2 (process 1)] 4
[first_ten_distributed::P2 (process 1)] 5
[first_ten_distributed::P2 (process 1)] 6
[first_ten_distributed::P2 (process 1)] 7
[first_ten_distributed::P2 (process 1)] 8
[first_ten_distributed::P2 (process 1)] 9