Skip to main content

Scaling with Clusters

So far, we have looked at distributed systems where each process is running a different piece of the compute graph -- compute parallelism. However, we can also use Hydro to run the same computation on multiple processes -- achieving data parallelism (e.g. partitioning). This is done by creating a cluster of processes that all run the same subgraph of code.

Dataflow with Clusters

Just like we use the Process type to represent a virtual handle to a single node, we can use the Cluster type to represent a handle to a set of nodes (with size unknown at compile-time).

A Stream located on a Cluster can be thought of as SIMD-style programming, where each cluster member executes the same operators but on different pieces of data.

To start, we set up a new module in src/first_ten_cluster.rs with a dataflow program that takes in a Process for a leader and Cluster for a set of workers.

src/first_ten_cluster.rs
use hydro_lang::*;

pub struct Leader {}
pub struct Worker {}

pub fn first_ten_cluster<'a>(leader: &Process<'a, Leader>, workers: &Cluster<'a, Worker>) {

We start by materializing a stream of numbers on the leader, as before. But rather than sending the stream to a single process, we will instead distribute the data to each member of the cluster using round_robin_bincode. This API places data on a cluster in a round-robin fashion by using the order of elements to determine which cluster member each element is sent to.

info

There are a variety of APIs for sending data to and receiving data from clusters. For example, we broadcast_bincode to send copies to all members (e.g. for replication), or use send_bincode if we have a custom partitioning algorithm.

src/first_ten_cluster.rs
leader
.source_iter(q!(0..10)) // : Stream<i32, Process<Leader>, ...>
.round_robin_bincode(workers) // : Stream<i32, Cluster<Worker>, ...>

On each cluster member, we will then do some work to transform the data (using map) and log the transformed values locally (using inspect, which is useful for debugging).

src/first_ten_cluster.rs
.map(q!(|n| n * 2)) // : Stream<i32, Cluster<Worker>, ...>
.inspect(q!(|n| println!("{}", n))) // : Stream<i32, Cluster<Worker>, ...>

Finally, we will send the data back to the leader. We achieve this using a variant of the APIs from before: send_bincode_interleaved. If we used send_bincode, we would get a stream of (cluster ID, data) tuples. Since it is a common pattern to ignore the IDs, send_bincode_interleaved is available as a helper.

src/first_ten_cluster.rs
        .send_bincode_interleaved(leader) // : Stream<i32, Process<Leader>, ...>
.for_each(q!(|n| println!("{}", n)));
}

Deploying Clusters

Deployment scripts are similar to before, except that when provisioning a cluster we provide a list of deployment hosts rather than a single one. In our example, we'll launch 4 nodes for the cluster by creating a Vec of 4 localhost instances.

examples/first_ten_cluster.rs
use hydro_deploy::Deployment;

#[tokio::main]
async fn main() {
let mut deployment = Deployment::new();

let flow = hydro_lang::FlowBuilder::new();
let leader = flow.process();
let workers = flow.cluster();
hydro_template::first_ten_cluster::first_ten_cluster(&leader, &workers);

let _nodes = flow
.with_process(&leader, deployment.Localhost())
.with_cluster(&workers, vec![deployment.Localhost(); 4])
.deploy(&mut deployment);

deployment.run_ctrl_c().await.unwrap();
}

We can then launch the program:

cargo run --example first_ten_cluster
[hydro_template::first_ten_cluster::Worker (cluster 1) / 0] 0
[hydro_template::first_ten_cluster::Worker (cluster 1) / 2] 4
[hydro_template::first_ten_cluster::Worker (cluster 1) / 2] 12
[hydro_template::first_ten_cluster::Worker (cluster 1) / 0] 8
[hydro_template::first_ten_cluster::Worker (cluster 1) / 3] 6
[hydro_template::first_ten_cluster::Worker (cluster 1) / 1] 2
[hydro_template::first_ten_cluster::Worker (cluster 1) / 1] 10
[hydro_template::first_ten_cluster::Worker (cluster 1) / 1] 18
[hydro_template::first_ten_cluster::Leader (process 0)] 0
[hydro_template::first_ten_cluster::Worker (cluster 1) / 0] 16
[hydro_template::first_ten_cluster::Worker (cluster 1) / 3] 14
[hydro_template::first_ten_cluster::Leader (process 0)] 8
[hydro_template::first_ten_cluster::Leader (process 0)] 16
[hydro_template::first_ten_cluster::Leader (process 0)] 2
[hydro_template::first_ten_cluster::Leader (process 0)] 10
[hydro_template::first_ten_cluster::Leader (process 0)] 18
[hydro_template::first_ten_cluster::Leader (process 0)] 4
[hydro_template::first_ten_cluster::Leader (process 0)] 12
[hydro_template::first_ten_cluster::Leader (process 0)] 6
[hydro_template::first_ten_cluster::Leader (process 0)] 14

You'll notice the round-robin distribution in action here, as each cluster log is tagged with the ID of the member (e.g. / 0). In our deployment, we are sending data round-robin across 4 members of the cluster, numbered 0 through 3. Hence cluster member 0 receives values 0, 4, 8 (corresponding to the highlighted lines), member 1 receives values 1, 5, 9, and so on.