Skip to main content

Clusters

A key restriction of processes in Hydroflow+ is that there can only be one instance of the computation assigned to each process across the entire distributed system. This is fine for simple applications with only pipelined computation, but for scaling out we need the ability to have multiple instances of the same computation running in parallel.

Clusters solve this by providing an nearly-identical API to processes, but representing a set of instances running the same computation instead of a single one. What changes when using a cluster is sending data, since we need to specify which instance(s) of the computation to send the data to.

Computing on Clusters

Instantiating clusters is done using the cluster method on FlowBuilder, taking a ClusterSpec:

pub fn my_flow<'a, D: Deploy<'a>>(
flow: &'a FlowBuilder<'a, D>,
cluster_spec: &impl ClusterSpec<'a, D>
) {
let cluster = flow.cluster(cluster_spec);
}

This API follows the same pattern as processes, where a cluster spec represents a template for a cluster, which can be instantiated multiple times to create multiple clusters.

Instantiating streams on clusters uses the same APIs as streams: source_iter and source_stream are both available. But when using these APIs, the root streams will be instantiated on all instances in the cluster.

let stream = cluster.source_iter(q!(vec![1, 2, 3]));

stream.for_each(q!(|x| println!("{}", x)))
// will print 1, 2, 3 on **each** instance

Sending Data

Because clusters represent a set of instances, adding networking requires us to specify which instance(s) to send data to. Clusters provide different APIs depending on if the source or receiver is a cluster or a process.

Elements in a cluster are identified by a cluster ID (a u32). To get the IDs of all instances in a cluster, use the ids method on cluster, which returns a runtime expression of type &Vec<u32> (which can only be used inside q!() or as an argument to source_iter). All IDs always are ranging from 0 through the length of the IDs vector.

This can then be passed into source_iter to load the IDs into the graph.

let stream = process.source_iter(cluster.ids()).cloned();

One-to-Many

When sending data from a process to a cluster, the primary API is demux_{bincode,bytes}, which takes a stream of tuples of the form (u32, T) and sends each T element to the instance with the matching u32 ID.

This is useful for partitioning data across instances. For example, we can partition a stream of elements in a round-robin fashion by using enumerate to add a sequence number to each element, then using demux_bincode to send each element to the instance with the matching sequence number:

let cluster_ids = cluster.ids();
let stream = process.source_iter(q!(vec![123, 456, 789]))
.enumerate()
.map(q!(|(i, x)| (
i % cluster_ids.len() as u32,
x
)))
.demux_bincode(cluster);

To broadcast data to all instances in a cluster, use broadcast_{bincode,bytes}, which acts as a shortcut that uses demux under the hood.

let stream = process.source_iter(q!(vec![123, 456, 789]))
.broadcast_bincode(cluster);

Many-to-One

In the other direction, sending data from a cluster to a process, the primary API is send_{bincode,bytes}_tagged. On the sender side, we have a stream of elements of type T, but on the recipient side we get a stream of tuples of the form (u32, T), where the u32 is the ID of the instance that sent the element. The elements received from different instances will be interleaved.

This is useful for aggregating data from multiple instances into a single stream. For example, we can use send_bincode_tagged to send data from all instances to a single process, and then print them all out:

let stream = cluster.source_iter(q!(vec![123, 456, 789]))
.send_bincode_tagged(process)
.for_each(q!(|(id, x)| println!("{}: {}", id, x)));

If you don't care which instance sent the data, you can use send_{bincode,bytes}_interleaved, where the recipient receives a stream of T elements, but the elements received from different instances will be interleaved.

let stream = cluster.source_iter(q!(vec![123, 456, 789]))
.send_bincode_interleaved(process)
.for_each(q!(|x| println!("{}", x)));

Many-to-Many

Finally, when sending data from one cluster to another (or to itself as in distributed protocols), the primary API is demux_{bincode,bytes}_tagged, which takes a stream of tuples of the form (u32, T) and sends each T element to the instance with the matching u32 ID, but the recipient also gets (u32, T) tuples with the ID of the sender.

There are also broadcast_{bincode,bytes}_tagged and {demux,broadcast}_{bincode,bytes}_interleaved shortcuts available. For example, we can use broadcast_bincode_interleaved to send data from all instances in a cluster to all instances in another cluster, and then print them all out:

let stream = cluster1.source_iter(q!(vec![123, 456, 789]))
.broadcast_bincode_interleaved(cluster2)
.for_each(q!(|x| println!("{}", x)));