Skip to main content

Concurrent Clients

In the previous section, you built a counter service for a single client. Now you'll extend it to handle multiple concurrent clients, each with their own independent stream of requests and responses.

You'll learn how to use KeyedStream to handle concurrent clients while maintaining a single global counter:

You will learn

  • How KeyedStream enables handling multiple concurrent clients
  • How .values() combines streams from all clients into a single global stream
  • How to test scenarios with multiple clients incrementing and reading the counter

From Single Client to Multiple Clients

The single client counter used regular Stream types for inputs and outputs. To handle multiple clients, you need to identify which client sent each request and route responses back to the correct client. Additionally, you need to maintain proper ordering guarantees: each individual client's requests should maintain their order relative to each other, but requests from different clients can be processed in any interleaving. For example, Client 1's first request should be processed before their second request, but Client 1's requests can be processed in any order relative to Client 2's requests.

info

Create a new file src/concurrent_clients.rs and add mod concurrent_clients; to your src/lib.rs.

Hydro provides KeyedStream for exactly this purpose. A KeyedStream<K, V> is like a Stream<V>, but each element is associated with a key of type K. The key maintains independent ordering per key while allowing different keys to interleave. The key will be the client ID (type u32):

pub fn concurrent_counter_service<'a>(
increment_requests: KeyedStream<u32, (), Process<'a, CounterServer>>,
get_requests: KeyedStream<u32, (), Process<'a, CounterServer>>,
) -> (
KeyedStream<u32, (), Process<'a, CounterServer>>, // increment acknowledgments
KeyedStream<u32, usize, Process<'a, CounterServer>>, // get responses
) {

The service now takes KeyedStream<u32, ()> instead of Stream<()>. Each request is tagged with a client ID, so when client 1 sends an increment request, it appears as a keyed stream element with key 1 and value ().

Aggregating Across All Clients

The counter still maintains a single global count across all clients. Since the client ID doesn't matter for counting, you need to discard it:

let atomic_tick = increment_requests.location().tick();
let increment_request_processing = increment_requests.atomic(&atomic_tick);
let current_count = increment_request_processing.clone().values().count();

The .values() method extracts just the values from the keyed stream, discarding the client IDs since they're irrelevant for computing the global count. This produces a regular Stream<()> containing all increment requests from all clients.

The count() method then aggregates these requests into a Singleton<usize> that tracks the running total. This is convenient shorthand for computing the current counter value. For custom stateful logic, you would use reduce() or fold() to maintain arbitrary state across the stream.

tip

When you call .values() on a KeyedStream, the resulting stream is unordered because requests from different clients can arrive in any order. Hydro's type system tracks this: .values() returns a stream marked as unordered, restricting you to operations that don't depend on element order. Operations like .count() are allowed because they're commutative, but .last() would not compile.

The rest of the implementation follows the same pattern as the single client counter. Acknowledgements are routed back to the correct client because the keyed structure is preserved:

let increment_ack = increment_request_processing.end_atomic();

And get responses use cross_singleton which pairs the count with each client's request while preserving the client ID key:

let get_response = sliced! {
let request_batch = use(get_requests, nondet!(/** we never observe batch boundaries */));
let count_snapshot = use::atomic(current_count, nondet!(/** atomicity guarantees consistency wrt increments */));

request_batch.cross_singleton(count_snapshot).map(q!(|(_, count)| count))
};

Testing Multiple Concurrent Clients

Let's write a test where two different clients increment the counter and both read the result:

#[test]
fn test_concurrent_clients() {
let flow = FlowBuilder::new();
let process = flow.process();

let (inc_in_port, inc_requests) = process.sim_input();
let inc_requests = inc_requests.into_keyed();

let (get_in_port, get_requests) = process.sim_input();
let get_requests = get_requests.into_keyed();

let (inc_acks, get_responses) = concurrent_counter_service(inc_requests, get_requests);

let inc_out_port = inc_acks.entries().sim_output();
let get_out_port = get_responses.entries().sim_output();

flow.sim().exhaustive(async || {
// Client 1 increments
inc_in_port.send((1, ()));
inc_out_port.assert_yields_unordered([(1, ())]).await;

// Client 2 increments
inc_in_port.send((2, ()));
inc_out_port.assert_yields_unordered([(2, ())]).await;

// Client 1 reads - should see 2
get_in_port.send((1, ()));
get_out_port.assert_yields_unordered([(1, 2)]).await;

// Client 2 reads - should also see 2
get_in_port.send((2, ()));
get_out_port.assert_yields_unordered([(2, 2)]).await;
});
}

The test uses .into_keyed() to convert regular streams into keyed streams for input, and .entries() to convert keyed streams back to regular streams of (key, value) tuples for output. This is necessary because the simulator currently only supports Stream as inputs and outputs.

Because .entries() flattens keyed streams with non-deterministic interleaving, you must use assert_yields_unordered instead of ordered assertions.

This test verifies several important behaviors:

  1. Independent acknowledgements: Each client receives an acknowledgement for their own increment request
  2. Global count: Both clients observe the same count (2) because increments from all clients contribute to a single counter
  3. Proper routing: Each client receives only their own responses, not responses meant for other clients

The exhaustive simulator will test all possible interleavings of these operations, ensuring the counter behaves correctly regardless of:

  • Which client's increment is processed first
  • Whether increments are processed before or after get requests arrive
  • Various timing of acknowledgements and responses

Run the test with cargo test -- concurrent_clients to verify the implementation handles concurrent clients correctly.

Next Steps

You've now extended the counter service to handle multiple concurrent clients accessing a shared global counter. Each client can independently send requests and receive responses, while the counter maintains a single global count.

In the next section, you'll extend this to a keyed counter where instead of one global count, you maintain independent counts for different keys. This will introduce new challenges around regrouping streams and managing keyed state.