Skip to main content

A Keyed Counter

In the previous section, you built a single counter that tracked a global count. Now you'll extend this to a keyed counter service that maintains independent counts for different keys. This introduces a new challenge: requests arrive keyed by client ID, but you need to store and look up state by key name.

You'll learn how to regroup streams and work with keyed state. Along the way, you will encounter stream types for the first time, which Hydro uses to enforce distributed correctness guarantees:

You will learn

  • How to use the KeyedSingleton live collection to store keyed state
  • How Hydro uses stream types to track message ordering and enforce distributed safety
  • How to regroup keyed streams for routing messages and looking up state

Handling Multiple Keys

Instead of a single global counter, you'll now support multiple independent counters identified by string keys. Each client can specify which key to increment or query.

info

Create a new file src/keyed_counter.rs and add mod keyed_counter; to your src/lib.rs.

Your keyed counter service will handle requests where each client specifies a key name:

pub fn keyed_counter_service<'a>(
increment_requests: KeyedStream<u32, String, Process<'a, CounterServer>, Unbounded>,
get_requests: KeyedStream<u32, String, Process<'a, CounterServer>, Unbounded>,
) -> (
KeyedStream<u32, String, Process<'a, CounterServer>, Unbounded>,
KeyedStream<u32, (String, usize), Process<'a, CounterServer>, Unbounded>,
)

The function signature is similar to the single counter, but now the value type is String (the key name) instead of (). Get responses return tuples of (String, usize) containing both the key name and its count.

Maintaining Keyed State

In the single counter, you used a Singleton to store a single value that updates over time. Now you need to store multiple values, one for each key. Hydro provides the KeyedSingleton live collection for this purpose. A KeyedSingleton<K, V> associates each key of type K with a value of type V, like a map. Just like a Singleton, the values update over time as new data arrives.

Just like in the single counter, you'll need to establish consistency guarantees between increment acknowledgements and count snapshots. You start by marking the increment requests as atomic:

let atomic_tick = increment_requests.location().tick();
let increment_request_processing = increment_requests.atomic(&atomic_tick);

To track counts for multiple keys, you need to regroup the increment requests. Currently, they're keyed by client ID, but you need to count by key name. Regrouping a keyed stream is a common pattern in Hydro. You convert the keyed stream to entries (tuples of (client_id, key_name)), then map to swap the order to (key_name, ()). The into_keyed() method converts this back into a keyed stream, now keyed by key name:

let current_count = increment_request_processing
.clone()
.entries()
.map(q!(|(_, key)| (key, ())))
.into_keyed()
.value_counts();

The value_counts() method creates a KeyedSingleton<String, usize> that tracks the count for each key.

tip

Just as Stream has methods like count() to create a singleton, KeyedStream has methods like value_counts() to create a keyed singleton. Other methods include fold() for custom aggregations.

Finally, you acknowledge the increments by ending the atomic block:

let increment_ack = increment_request_processing.end_atomic();

Now you have a KeyedSingleton that maintains independent counts for each key, and acknowledgements are sent back to clients as increments are processed.

Looking Up Keyed State

Get requests arrive keyed by client ID, but you need to look them up by key name. This requires regrouping:

let requests_regrouped = get_requests
.entries()
.map(q!(|(cid, key)| (key, cid)))
.into_keyed();

This swaps the key and value: (client_id, key_name) becomes (key_name, client_id). Now the stream is keyed by key name, matching the structure of current_count. With both get requests and counts keyed by key name, you can perform lookups using sliced!:

let get_lookup = sliced! {
let request_batch = use(requests_regrouped, nondet!(/** we never observe batch boundaries */));
let count_snapshot = use::atomic(current_count, nondet!(/** atomicity guarantees consistency wrt increments */));

count_snapshot.get_many_if_present(request_batch)
};

Just like the previous tutorial, use::atomic ensures that count snapshots are consistent with increment acknowledgements. The get_many_if_present() method looks up the count for each key in the request batch, returning a keyed stream where each element contains the count and the client ID.

info

The get_many_if_present() method only returns entries for keys that exist in the keyed singleton. If a client queries a key that has never been incremented, no response is generated.

Finally, you need to regroup the responses by client ID so each client receives their own responses:

let get_response = get_lookup
.entries()
.map(q!(|(key, (count, client))| (client, (key, count))))
.into_keyed();

The lookup results are keyed by key name with values (count, client_id). You transform this to (client_id, (key_name, count)) and regroup by client ID. This completes the data flow: requests arrive organized by client, get processed by key, and responses are routed back to the appropriate clients.

Ordering Guarantees in the Type System

If you try to compile this code now, you'll encounter a type error:

error[E0308]: mismatched types
--> src/keyed_counter.rs:41:21
|
41 | (increment_ack, get_response)
| ^^^^^^^^^^^^ expected `KeyedStream<u32, (String, usize), L>`,
| found `KeyedStream<u32, ..., ..., ..., ...>`
|
= note: expected struct `KeyedStream<_, _, _, _, TotalOrder, _>`
found struct `KeyedStream<_, _, _, _, NoOrder, _>`

This error reveals something unique about Hydro: the type system reasons about distributed system properties like out-of-order messages and retries. The get_many_if_present() method performs a hash join between get requests and the keyed singleton. Hash joins process elements based on hash values rather than arrival order, so they cannot preserve ordering. The type system captures this by marking the result as NoOrder.

In distributed systems, properties like ordering guarantees are critical for correctness. Consider what could go wrong if you assumed responses arrived in order:

  • A client sends requests for keys "a", "b", "c" in that order
  • Network delays or retries can cause responses to arrive out of order
  • If the client incorrectly assumes ordering, it might associate counts with the wrong keys
  • Without type-level tracking, this bug could silently corrupt application logic

Hydro captures distributed system properties in the type system using stream types like NoOrder. This means:

  • Compile-time safety: You can't accidentally treat unordered data as ordered
  • Explicit contracts: Function signatures document ordering guarantees, making APIs self-documenting
  • Optimization opportunities: The compiler knows when it's safe to use more efficient unordered operations

To fix the type error, you need to explicitly acknowledge that get responses have no ordering guarantee by adding NoOrder to the return type:

use hydro_lang::live_collections::stream::NoOrder;

pub fn keyed_counter_service<'a>(
increment_requests: KeyedStream<u32, String, Process<'a, CounterServer>, Unbounded>,
get_requests: KeyedStream<u32, String, Process<'a, CounterServer>, Unbounded>,
) -> (
KeyedStream<u32, String, Process<'a, CounterServer>, Unbounded>,
KeyedStream<u32, (String, usize), Process<'a, CounterServer>, Unbounded, NoOrder>,
)

This makes the lack of ordering guarantees explicit in the API contract. Callers of this function will know that get responses may arrive in any order, which is acceptable for this use case since each response contains the key name, allowing clients to match responses to their requests.

Testing the Keyed Counter

The test follows the same pattern as the single counter. Because the simulator currently only supports Stream inputs and outputs, you can use .into_keyed() and .entries() to convert in and out of KeyedStream.

#[test]
fn test_counter_read_after_write() {
let flow = FlowBuilder::new();
let process = flow.process::<CounterServer>();

let (inc_in_port, inc_requests) = process.sim_input();
let inc_requests = inc_requests.into_keyed();

let (get_in_port, get_requests) = process.sim_input();
let get_requests = get_requests.into_keyed();

let (inc_acks, get_responses) = keyed_counter_service(inc_requests, get_requests);

let inc_out_port = inc_acks.entries().sim_output();
let get_out_port = get_responses.entries().sim_output();

flow.sim().exhaustive(async || {
inc_in_port.send((1, "abc".to_string()));
inc_out_port
.assert_yields_unordered([(1, "abc".to_string())])
.await;
get_in_port.send((1, "abc".to_string()));
get_out_port
.assert_yields_only_unordered([(1, ("abc".to_string(), 1))])
.await;
});
}

This test verifies that a client can increment a specific key ("abc") and then read back the correct count. The atomic consistency guarantees ensure that after receiving an acknowledgement, a subsequent get request observes the updated count.

Notice that the test uses assert_yields_unordered and assert_yields_only_unordered instead of the ordered variants from the previous tutorial. This is necessary because calling KeyedStream::entries() flattens the elements with non-deterministic interleaving into a Stream<..., NoOrder>. The _unordered assertion methods account for this by checking that the expected elements appear without requiring a specific order.

Run the test with cargo test -- keyed_counter to verify the implementation works correctly across all possible execution orderings.

Next Steps

You've now built a keyed counter service that tracks independent counts for multiple keys. The key insight is that keyed streams can be regrouped by different keys, allowing you to organize data by client ID for routing while storing state by key name for lookups.

In the next section, you'll learn how to partition this service across a cluster of machines using Cluster locations and MemberId routing.