Clusters
When building scalable distributed systems in Hydro, you'll often need to use clusters, which represent groups of threads all running the same piece of your program (Single-Program-Multiple-Data, or "SPMD"). Hydro clusters can be used to implement scale-out systems using techniques such as sharding or replication. Unlike processes, the number of threads in a cluster does not need to be static, and can be chosen during deployment.
Like when creating a process, you can pass in a type parameter to a cluster to distinguish it from other clusters. For example, you can create a cluster with a marker of Worker
to represent a pool of workers in a distributed system:
struct Worker {}
let flow = FlowBuilder::new();
let workers: Cluster<Worker> = flow.cluster::<Worker>();
You can then instantiate a live collection on the cluster using the same APIs as for processes. For example, you can create a stream of integers on the worker cluster. If you launch this program, each member of the cluster will create a stream containing the elements 1, 2, 3, and 4:
let numbers = workers.source_iter(q!(vec![1, 2, 3, 4]));
Networking
When sending a live collection from a cluster to another location, each member of the cluster will send its local collection. On the receiver side, these collections will be joined together into a keyed stream of with ID
keys and groups of Data
values where the ID uniquely identifies which member of the cluster the data came from. For example, you can send a stream from the worker cluster to another process using the send_bincode
method:
let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
numbers.send_bincode(&process) // KeyedStream<MemberId<()>, i32, ...>
// if there are 4 members in the cluster, we should receive 4 elements
// { MemberId::<Worker>(0): [1], MemberId::<Worker>(1): [1], MemberId::<Worker>(2): [1], MemberId::<Worker>(3): [1] }
If you do not need to know which member of the cluster the data came from, you can use the values()
method on the keyed stream, which will drop the IDs at the receiver:
let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
numbers.send_bincode(&process).values()
// if there are 4 members in the cluster, we should receive 4 elements
// 1, 1, 1, 1
In the reverse direction, when sending a stream to a cluster, the sender must prepare (ID, Data)
tuples, where the ID uniquely identifies which member of the cluster the data is intended for. Then, we can send a stream from a process to the worker cluster using the demux_bincode
method:
let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
let on_worker: Stream<_, Cluster<_>, _> = numbers
.map(q!(|x| (MemberId::from_raw(x), x)))
.demux_bincode(&workers);
on_worker.send_bincode(&p2)
// if there are 4 members in the cluster, we should receive 4 elements
// { MemberId::<Worker>(0): [0], MemberId::<Worker>(1): [1], MemberId::<Worker>(2): [2], MemberId::<Worker>(3): [3] }
Broadcasting and Membership Lists
A common pattern in distributed systems is to broadcast data to all members of a cluster. In Hydro, this can be achieved using broadcast_bincode
, which takes in a stream of only data elements and broadcasts them to all members of the cluster. For example, we can broadcast a stream of integers to the worker cluster:
let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
on_worker.send_bincode(&p2)
// if there are 4 members in the cluster, we should receive 4 elements
// { MemberId::<Worker>(0): [123], MemberId::<Worker>(1): [123], MemberId::<Worker>(2): [123, MemberId::<Worker>(3): [123] }
This API requires a non-determinism guard, because the set of cluster members may asynchronously change over time. Depending on when we are notified of membership changes, we will broadcast to different members. Under the hood, the broadcast_bincode
API uses a list of members of the cluster provided by the deployment system. To manually access this list, you can use the source_cluster_members
method to get a stream of membership events (cluster members joining or leaving):
let p1 = flow.process::<()>();
let workers: Cluster<()> = flow.cluster::<()>();
let cluster_members = p1.source_cluster_members(&workers);
// if there are 4 members in the cluster, we would see a join event for each
// { MemberId::<Worker>(0): [MembershipEvent::Join], MemberId::<Worker>(2): [MembershipEvent::Join], ... }
Self-Identification
In some programs, it may be necessary for cluster members to know their own ID (for example, to construct a ballot in Paxos). In Hydro, this can be achieved by using the CLUSTER_SELF_ID
constant, which can be used inside q!(...)
blocks to get the current cluster member's ID:
let workers: Cluster<()> = flow.cluster::<()>();
let self_id_stream = workers.source_iter(q!([CLUSTER_SELF_ID]));
self_id_stream
.filter(q!(|x| x.raw_id % 2 == 0))
.map(q!(|x| format!("hello from {}", x.raw_id)))
.send_bincode(&process)
.values()
// if there are 4 members in the cluster, we should receive 2 elements
// "hello from 0", "hello from 2"
You can only use CLUSTER_SELF_ID
in code that will run on a Cluster<_>
, such as when calling Stream::map
when that stream is on a cluster. If you try to use it in code that will run on a Process<_>
, you'll get a compile-time error:
let process: Process<()> = flow.process::<()>();
process.source_iter(q!([CLUSTER_SELF_ID]));
// error[E0277]: the trait bound `ClusterSelfId<'_>: FreeVariableWithContext<hydro_lang::Process<'_>>` is not satisfied