Scaling with Clusters
So far, we have looked at distributed systems where each process is running a different piece of the compute graph -- compute parallelism. However, we can also use Hydro to run the same computation on multiple processes -- achieving data parallelism (e.g. partitioning). This is done by creating a cluster of processes that all run the same subgraph of code.
Dataflow with Clusters
Just like we use the Process
type to represent a virtual handle to a single node, we can use the Cluster
type to represent a handle to a set of nodes (with size unknown at compile-time).
A Stream
located on a Cluster
can be thought of as SIMD-style programming, where each cluster member executes the same operators but on different pieces of data.
To start, we set up a new module in src/first_ten_cluster.rs
with a dataflow program that takes in a Process
for a leader and Cluster
for a set of workers.
use hydro_lang::*;
pub struct Leader {}
pub struct Worker {}
pub fn first_ten_cluster<'a>(leader: &Process<'a, Leader>, workers: &Cluster<'a, Worker>) {
We start by materializing a stream of numbers on the leader
, as before. But rather than sending the stream to a single process, we will instead distribute the data to each member of the cluster using round_robin_bincode
. This API places data on a cluster
in a round-robin fashion by using the order of elements to determine which cluster member each element is sent to.
There are a variety of APIs for sending data to and receiving data from clusters. For example, we broadcast_bincode
to send copies to all members (e.g. for replication), or use send_bincode
if we have a custom partitioning algorithm.
leader
.source_iter(q!(0..10)) // : Stream<i32, Process<Leader>, ...>
.round_robin_bincode(workers) // : Stream<i32, Cluster<Worker>, ...>
On each cluster member, we will then do some work to transform the data (using map
) and log the transformed values locally (using inspect
, which is useful for debugging).
.map(q!(|n| n * 2)) // : Stream<i32, Cluster<Worker>, ...>
.inspect(q!(|n| println!("{}", n))) // : Stream<i32, Cluster<Worker>, ...>
Finally, we will send the data back to the leader. We achieve this using a variant of the APIs from before: send_bincode_interleaved
. If we used send_bincode
, we would get a stream of (cluster ID, data) tuples. Since it is a common pattern to ignore the IDs, send_bincode_interleaved
is available as a helper.
.send_bincode_interleaved(leader) // : Stream<i32, Process<Leader>, ...>
.for_each(q!(|n| println!("{}", n)));
}
Deploying Clusters
Deployment scripts are similar to before, except that when provisioning a cluster we provide a list of deployment hosts rather than a single one. In our example, we'll launch 4 nodes for the cluster by creating a Vec
of 4 localhost instances.
use hydro_deploy::Deployment;
#[tokio::main]
async fn main() {
let mut deployment = Deployment::new();
let flow = hydro_lang::FlowBuilder::new();
let leader = flow.process();
let workers = flow.cluster();
hydro_template::first_ten_cluster::first_ten_cluster(&leader, &workers);
let _nodes = flow
.with_process(&leader, deployment.Localhost())
.with_cluster(&workers, vec![deployment.Localhost(); 4])
.deploy(&mut deployment);
deployment.run_ctrl_c().await.unwrap();
}
We can then launch the program:
cargo run --example first_ten_cluster
[hydro_template::first_ten_cluster::Worker (cluster 1) / 0] 0
[hydro_template::first_ten_cluster::Worker (cluster 1) / 2] 4
[hydro_template::first_ten_cluster::Worker (cluster 1) / 2] 12
[hydro_template::first_ten_cluster::Worker (cluster 1) / 0] 8
[hydro_template::first_ten_cluster::Worker (cluster 1) / 3] 6
[hydro_template::first_ten_cluster::Worker (cluster 1) / 1] 2
[hydro_template::first_ten_cluster::Worker (cluster 1) / 1] 10
[hydro_template::first_ten_cluster::Worker (cluster 1) / 1] 18
[hydro_template::first_ten_cluster::Leader (process 0)] 0
[hydro_template::first_ten_cluster::Worker (cluster 1) / 0] 16
[hydro_template::first_ten_cluster::Worker (cluster 1) / 3] 14
[hydro_template::first_ten_cluster::Leader (process 0)] 8
[hydro_template::first_ten_cluster::Leader (process 0)] 16
[hydro_template::first_ten_cluster::Leader (process 0)] 2
[hydro_template::first_ten_cluster::Leader (process 0)] 10
[hydro_template::first_ten_cluster::Leader (process 0)] 18
[hydro_template::first_ten_cluster::Leader (process 0)] 4
[hydro_template::first_ten_cluster::Leader (process 0)] 12
[hydro_template::first_ten_cluster::Leader (process 0)] 6
[hydro_template::first_ten_cluster::Leader (process 0)] 14
You'll notice the round-robin distribution in action here, as each cluster log is tagged with the ID of the member (e.g. / 0
). In our deployment, we are sending data round-robin across 4 members of the cluster, numbered 0
through 3
. Hence cluster member 0
receives values 0
, 4
, 8
(corresponding to the highlighted lines), member 1
receives values 1
, 5
, 9
, and so on.