Skip to main content

Adding Distribution

Continuing from our previous example, we will now look at how to extend our program to run on multiple processes. Recall that our previous flow graph looked like this:

flow/src/first_ten.rs
use hydroflow_plus::*;
use stageleft::*;

pub fn first_ten<'a, D: LocalDeploy<'a>>(
flow: &FlowBuilder<'a, D>,
process_spec: &impl ProcessSpec<'a, D>
) {
let process = flow.process(process_spec);
let numbers = flow.source_iter(&process, q!(0..10));
numbers.for_each(q!(|n| println!("{}", n)));
}

The Flow Graph

Let's extend this example to print the numbers on a separate process. First, we need to specify that our flow graph will involve the network. We do this by replacing the LocalDeploy<'a> trait bound with the general Deploy<'a>. Then, we can use the process_spec to create a second process:

flow/src/first_ten_distributed.rs
use hydroflow_plus::*;
use stageleft::*;

pub fn first_ten_distributed<'a, D: Deploy<'a>>(
flow: &FlowBuilder<'a, D>,
process_spec: &impl ProcessSpec<'a, D>
) {
let process = flow.process(process_spec);
let second_process = flow.process(process_spec);
}

Now, we can distribute our dataflow by using the send_bincode operator to mark where the data should be sent using bincode serialization.

let numbers = flow.source_iter(&process, q!(0..10));
numbers
.send_bincode(&second_process)
.for_each(q!(|n| println!("{}", n)));

The Runtime

Now that our graph spans multiple processes, our runtime entrypoint will involve multiple subgraphs. This means we can't get away with just the optimized dataflow. Instead, we must take the subgraph ID as a runtime parameter through with_dynamic_id to select the appropriate graph. In addition, our dataflow involves the network, so we take a HydroCLI runtime parameter (cli) so that processes can look up their network connections and instantiate the flow graph with access to it.

flow/src/first_ten_distributed.rs
use hydroflow_plus::util::cli::HydroCLI;
use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta};

#[stageleft::entry]
pub fn first_ten_distributed_runtime<'a>(
flow: FlowBuilder<'a, CLIRuntime>,
cli: RuntimeData<&'a HydroCLI<HydroflowPlusMeta>>,
) -> impl Quoted<'a, Hydroflow<'a>> {
first_ten_distributed(&flow, &cli);
flow.extract()
.optimize_default()
.with_dynamic_id(q!(cli.meta.subgraph_id))
}

The corresponding binary in src/bin/first_ten_distributed.rs then instantiates the CLI and reads the process ID from the command line arguments:

flow/src/bin/first_ten_distributed.rs
#[tokio::main]
async fn main() {
hydroflow_plus::util::cli::launch!(
|ports| flow::first_ten_distributed_runtime!(ports)
).await;
}

The Deployment

Finally, we need to deploy our dataflow with the appropriate network topology. We achieve this by using Hydro Deploy. Hydroflow+ integrates with Hydro Deploy to automatically construct the topology based on the flow graph. We can create a new file examples/first_ten_distributed.rs with the following contents:

flow/examples/first_ten_distributed.rs
use hydro_deploy::{Deployment, HydroflowCrate};
use hydroflow_plus_cli_integration::DeployProcessSpec;

#[tokio::main]
async fn main() {
let mut deployment = Deployment::new();
let localhost = deployment.Localhost();

let builder = hydroflow_plus::FlowBuilder::new();
flow::first_ten::first_ten_distributed(
&builder,
&DeployProcessSpec::new(|| {
deployment.add_service(
HydroflowCrate::new(".", localhost.clone())
.bin("first_ten_distributed")
.profile("dev"),
)
}),
);

deployment.deploy().await.unwrap();

deployment.start().await.unwrap();

tokio::signal::ctrl_c().await.unwrap()
}

Most importantly, we specify a DeployProcessSpec, which takes a closure that constructs a Hydro Deploy service for each process in the flow graph. In our case, we use the HydroflowCrate service type, which deploys a Hydroflow+ binary. We also specify the process ID as a command line argument, which is read by our runtime binary.

We can then run our distributed dataflow with:

cargo run -p flow --example first_ten_distributed
[service/1] 0
[service/1] 1
[service/1] 2
[service/1] 3
[service/1] 4
[service/1] 5
[service/1] 6
[service/1] 7
[service/1] 8
[service/1] 9