Skip to main content

Scaling with Clusters

So far, we have looked at distributed systems where there is a single process running each piece of the compute graph -- compute parallelism (like pipelining). However, we can also use Hydroflow+ to run the same computation on multiple processes -- achieving data parallelism (like replication and partitioning). This is done by creating a cluster of processes that all run the same subgraph.

Creating Clusters

Just like we use ProcessSpec to create processes, we use ClusterSpec to create clusters. We can then use the flow.cluster(spec) method to instantiate a cluster in our graph. Let's create a simple application where a leader process broadcasts data to a cluster of workers.

We start with the standard architecture, with a flow graph and a runtime entrypoint, but now take a cluster spec in addition to a process spec.

tip

If you have been following along with the Hydroflow+ template, you'll now need to declare a new module for this example. Create a new file at flow/src/broadcast.rs and add the following to flow/src/lib.rs:

flow/src/lib.rs
pub mod broadcast;
flow/src/broadcast.rs
use hydroflow_plus::*;
use stageleft::*;

pub fn broadcast<'a, D: Deploy<'a>>(
flow: &FlowBuilder<'a, D>,
process_spec: &impl ProcessSpec<'a, D>,
cluster_spec: &impl ClusterSpec<'a, D>
) {
let leader = flow.process(process_spec);
let workers = flow.cluster(cluster_spec);
}

Broadcasting Data

When sending data between individual processes, we used the send_bincode operator. When sending data from a process to a cluster, we can use the broadcast_bincode operator instead.

let data = flow.source_iter(&leader, q!(0..10));
data
.broadcast_bincode(&workers)
.for_each(q!(|n| println!("{}", n)));

The Stream returned by broadcast_bincode represents the data received on each process in the cluster. Because all processes in a cluster run the exact same computation, we can then use the for_each operator directly on that stream to print the data on each process.

Deploying Graphs with Clusters

To deploy this application, we must set up the runtime entrypoint and the Hydro Deploy configuration. The entrypoint looks similar to before, but now uses the CLI data to instantiate the cluster as well.

flow/src/broadcast.rs
use hydroflow_plus::util::cli::HydroCLI;
use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta};

#[stageleft::entry]
pub fn broadcast_runtime<'a>(
flow: FlowBuilder<'a, CLIRuntime>,
cli: RuntimeData<&'a HydroCLI<HydroflowPlusMeta>>,
) -> impl Quoted<'a, Hydroflow<'a>> {
broadcast(&flow, &cli, &cli);
flow.extract()
.optimize_default()
.with_dynamic_id(q!(cli.meta.subgraph_id))
}

Our binary (src/bin/broadcast.rs) looks similar to before:

flow/src/bin/broadcast.rs
#[tokio::main]
async fn main() {
let ports = hydroflow_plus::util::cli::init().await;

hydroflow_plus::util::cli::launch_flow(
flow::broadcast::broadcast_runtime!(&ports)
).await;
}

Finally, our deployment script (examples/broadcast.rs) instantiates multiple services for the leader process and the workers. Because we are sharing the deployment across multiple builders, we wrap it in a RefCell. Since this script defines the physical deployment, we explicitly instantiate multiple services for the cluster spec, returning a Vec of services. We also set a display name for each service so that we can tell them apart in the logs.

flow/examples/broadcast.rs
use std::cell::RefCell;

use hydro_deploy::{Deployment, HydroflowCrate};
use hydroflow_plus_cli_integration::{DeployProcessSpec, DeployClusterSpec};

#[tokio::main]
async fn main() {
let deployment = RefCell::new(Deployment::new());
let localhost = deployment.borrow_mut().Localhost();
let profile = "dev";

let builder = hydroflow_plus::FlowBuilder::new();
flow::broadcast::broadcast(
&builder,
&DeployProcessSpec::new(|| {
let mut deployment = deployment.borrow_mut();
deployment.add_service(
HydroflowCrate::new(".", localhost.clone())
.bin("broadcast")
.profile(profile)
.display_name("leader"),
)
}),
&DeployClusterSpec::new(|| {
let mut deployment = deployment.borrow_mut();
(0..2)
.map(|idx| {
deployment.add_service(
HydroflowCrate::new(".", localhost.clone())
.bin("broadcast")
.profile(profile)
.display_name(format!("worker/{}", idx)),
)
})
.collect()
}),
);

let mut deployment = deployment.into_inner();

deployment.deploy().await.unwrap();

deployment.start().await.unwrap();

tokio::signal::ctrl_c().await.unwrap()
}

If we run this script, we should see the following output:

cargo run -p flow --example broadcast
[worker/0] 0
[worker/1] 0
[worker/0] 1
[worker/1] 1
...