A Partitioned Counter
In the previous sections, you built a keyed counter service that runs on a single process. Now you'll scale it horizontally by partitioning the state across a cluster of machines. This introduces cluster locations and shows how to route data between processes and clusters.
You will learn
- How to perform networking between distributed locations within a single Rust function
- How to use
Clusterlocations to represent a dynamically sized set of machines - How to abstract services to run on arbitrary location types
- How to use
demuxandMemberIdto send data to specific cluster members
Abstracting Over Locations
Before partitioning the counter, you need to make the keyed_counter_service function work with any location type, not just Process<EchoServer>. To make it reusable, you can make the location generic by introducing a type parameter L that can be fulfilled by any location type:
pub fn keyed_counter_service<'a, L: Location<'a> + NoTick>(
increment_requests: KeyedStream<u32, String, L, Unbounded>,
get_requests: KeyedStream<u32, String, L, Unbounded>,
) -> (
KeyedStream<u32, String, L, Unbounded>,
KeyedStream<u32, (String, usize), L, Unbounded, NoOrder>,
) {
The L: Location<'a> + NoTick bound means the function works with any location that that is outside a Tick (like Process or Cluster). This lets you reuse the same counter logic whether it runs on a single process or several cluster members. The rest of the function body remains unchanged, it already works with any location type.
Introducing Clusters
So far, all our code has run on a single Process location. Now, it's time to scale up your service!
A Cluster location represents a group of machines all running the same code (Single-Program-Multiple-Data, or SPMD). Unlike Process, the number of machines in a cluster doesn't need to be fixed at compile time - you can scale the cluster up or down during deployment. Each member of the cluster has a unique MemberId that identifies it.
Clusters enable scale-out patterns like sharding and replication. When you send data to a cluster, you specify which member should receive each element, allowing you to partition work and state across the cluster.
Create a new file src/partitioned_counter.rs and add mod partitioned_counter; to your src/lib.rs.
You'll build a sharded counter service that partitions keys across cluster members:
pub fn sharded_counter_service<'a>(
leader: &Process<'a, CounterServer>,
shard_servers: &Cluster<'a, CounterShard>,
increment_requests: KeyedStream<u32, String, Process<'a, CounterServer>>,
get_requests: KeyedStream<u32, String, Process<'a, CounterServer>>,
) -> (
KeyedStream<u32, String, Process<'a, CounterServer>, Unbounded, NoOrder>,
KeyedStream<u32, (String, usize), Process<'a, CounterServer>, Unbounded, NoOrder>,
) {
Unlike the previous tutorials, this function takes references to both a leader Process and a Cluster of shard servers as parameters. This is necessary because the service will send data between these locations internally: requests arrive at the leader, get routed to the appropriate shard, and responses return to the leader.
Partitioning Data with MemberId
To partition data across the cluster, you need to assign each key to a specific cluster member. You do this by computing a hash of the key and mapping it to a MemberId:
let sharded_increment_requests = increment_requests
.prefix_key(q!(|(_client, key)| {
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
MemberId::from_raw_id(hasher.finish() as u32 % 5)
}))
.demux_bincode(shard_servers);
The prefix_key method adds a new key to the front of the keyed stream. Here, you compute a MemberId by hashing the key name and taking modulo 5 (assuming 5 shards). The result is a stream keyed by (MemberId, client_id, key_name).
MemberId::from_raw_id() creates a member ID from a raw integer. In production, you'd typically use the cluster size dynamically rather than hardcoding 5.
Now you'll use demux_bincode to send data from the leader to the cluster. This is where Hydro fundamentally differs from traditional distributed systems frameworks. In most frameworks, you write separate programs for each service (one for the leader, one for the shards), then configure external message brokers or RPC systems to connect them. You have to manually serialize messages, manage network connections, and coordinate deployment.
In Hydro, you write a single Rust function that describes the entire distributed service. When you call demux_bincode, you're performing network communication right in the middle of your function - but it feels like ordinary Rust code. The Hydro compiler automatically generates the network code, handles serialization, and deploys the right code to each machine. You can reason about your entire distributed system in one place, with full type safety and IDE support.
The demux_bincode method sends each element to the cluster member specified by the first component of the key:
let sharded_increment_requests = increment_requests
.prefix_key(q!(|(_client, key)| {
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
MemberId::from_raw_id(hasher.finish() as u32 % 5)
}))
.demux_bincode(shard_servers);
let sharded_get_requests = get_requests
.prefix_key(q!(|(_client, key)| {
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
MemberId::from_raw_id(hasher.finish() as u32 % 5)
}))
.demux_bincode(shard_servers);
After demux_bincode, the stream is now located at the cluster. The stream returned by a demux operator preserves the remaining keys after the MemberId is consumed for routing. In this case, we are left with a KeyedStream<u32, String, Cluster<'a, CounterShard>> - a stream keyed by client ID and key name, located on the cluster.
Running the Counter on the Cluster
Now you can call the generic keyed_counter_service function with the cluster-located streams:
let (sharded_increment_ack, sharded_get_response) = super::keyed_counter::keyed_counter_service(
sharded_increment_requests,
sharded_get_requests,
);
Because keyed_counter_service is generic over the location type, it works seamlessly with Cluster<'a, CounterShard>. Each cluster member independently maintains its own keyed singleton for the keys it's responsible for.
Finally, you need to send the responses back to the leader process:
let increment_ack = sharded_increment_ack.send_bincode(leader).drop_key_prefix();
let get_response = sharded_get_response.send_bincode(leader).drop_key_prefix();
The send_bincode method sends data from the cluster to the leader process. When data moves from a cluster to a process, it arrives as a keyed stream with MemberId as the first key. The drop_key_prefix method removes this MemberId key, leaving just the original keys (client ID and response data).
This is a standard Hydro pattern for building a partitioned service in Hydro: prefix a key for routing, demux to cluster, process, send back, drop the routing key. You will find similar code in key-value stores and even consensus protocols.
Testing the Partitioned Counter
The test is similar to the keyed counter test, but now you need to instantiate the cluster (using flow.cluster()) and specify the cluster size (using with_cluster_size):
#[test]
fn test_counter_read_after_write() {
let flow = FlowBuilder::new();
let process = flow.process();
let shards = flow.cluster();
let (inc_in_port, inc_requests) = process.sim_input();
let inc_requests = inc_requests.into_keyed();
let (get_in_port, get_requests) = process.sim_input();
let get_requests = get_requests.into_keyed();
let (inc_acks, get_responses) =
sharded_counter_service(&process, &shards, inc_requests, get_requests);
let inc_out_port = inc_acks.entries().sim_output();
let get_out_port = get_responses.entries().sim_output();
flow.sim()
.with_cluster_size(&shards, 5)
.exhaustive(async || {
inc_in_port.send((1, "abc".to_string()));
inc_out_port
.assert_yields_unordered([(1, "abc".to_string())])
.await;
get_in_port.send((1, "abc".to_string()));
get_out_port
.assert_yields_only_unordered([(1, ("abc".to_string(), 1))])
.await;
});
}
The with_cluster_size(&shards, 5) method configures the simulator to create 5 cluster members. The test verifies that the partitioned counter behaves identically to the single-process version, even though the state is now distributed across multiple machines.
Run the test with cargo test -- partitioned_counter to verify the implementation works correctly.
Next Steps
You've completed the quickstart tutorial and built a distributed counter service that scales horizontally. Starting from a single counter, you learned to manage keyed state, regroup streams, and work with distributed properties like ordering guarantees. You then abstracted the logic to work with any location type and partitioned state across a cluster. Throughout, you saw how Hydro's type system enforces correctness properties like atomicity and ordering, even as your service scales from a single process to a distributed cluster.
For more advanced topics and detailed API documentation, check out the framework reference.