A Single Counter
Now that you've explored the Hydro template and built a simple echo server, you'll create a more complex stateful service: a counter that can be incremented and queried by multiple concurrent clients. This will introduce you to key Hydro concepts for managing asynchronous state and local consistency.
In this section, you'll build a counter service that maintains state across requests and handles concurrent reads and writes. Along the way, you'll discover how Hydro's type system and simulation testing help you catch subtle distributed bugs before they reach production.
You will learn
- How to use the
Singletonlive collection to store application state - How to use slices and
nondet!markers to read snapshots of asynchronous state - How to test complex concurrent scenarios with exhaustive simulation tests
- How to use atomic streams to enforce local consistency guarantees
Handling Concurrent Clients
The echo server handled requests from a single client, but our counter service will support requests from concurrent clients. Instead of a single input and output stream, your service will need to handle several streams of requests and responses, one for each client.
To do this, you will use KeyedStream, a live collection that captures independent streams identified by a key. To handle dynamic clients, the key will be the client ID (with type u32). This allows you to track which client sent each request and ensure that responses are routed back to the correct client.
Create a new file src/single_counter.rs and add mod single_counter; to your src/lib.rs.
Another important difference from the echo server is that the counter service involves multiple request and response types:
- Increment requests: Add 1 to the counter and acknowledge the operation
- Get requests: Return the current count
The standard pattern in Hydro is to take in a separate parameter for each request type and return a separate stream for each output. Your counter service will take in two KeyedStream parameters for increment and get requests and return a tuple of two KeyedStreams for the responses:
pub fn single_counter_service<'a>(
increment_requests: KeyedStream<u32, (), Process<'a, CounterServer>>,
get_requests: KeyedStream<u32, (), Process<'a, CounterServer>>,
) -> (
KeyedStream<u32, (), Process<'a, CounterServer>>, // increment acknowledgments
KeyedStream<u32, usize, Process<'a, CounterServer>>, // get responses
) {
Asynchronous State and Slices
To track the counter value, you need to accumulate state across incoming requests. Hydro provides the Singleton live collection for storing a single value that can be updated over time.
You can create a singleton by counting the number of increment requests:
let current_count = increment_requests.clone().values().count();
Because the service is accumulating increments across all clients, the first step is to gether all the increments into a single stream, which you can do with .values(). This returns an unordered Stream containing just the () values across all clients. Then, the count() method consumes the stream and returns a Singleton<usize> that tracks the total number of elements seen so far. Since you want to both count the requests and acknowledge them, you use clone() to create a copy of the stream.
Hydro has already done some distributed safety checking for us! Because requests from different clients will arrive with non-deterministic order, .values() returns an unordered stream. This restricts the APIs we can call: .count() is allowed because it does not rely on element order (it is commutative) but .last() would not compile.
For now, let's acknowledge increment requests immediately:
let increment_ack = increment_requests;
Now comes the interesting part: how do you respond to get requests with the current count? The challenge is that current_count is a singleton that updates asynchronously as increment requests arrive, while get_requests is a stream that arrives independently.
Hydro provides the sliced! macro for this purpose. It allows you to perform computations that rely on a version of several live collections at some point in time. The way this version is revealed depends on the type of live collection. A Stream will be revealed as a batch of new elements at that point in time, and a Singleton will be revealed as a snapshot of the state.
let get_response = sliced! {
let request_batch = use(get_requests, nondet!(/** we never observe batch boundaries */));
let count_snapshot = use(current_count, nondet!(/** intentional, based on when the request came in */));
The sliced! macro takes multiple use statements, using syntax inspired by React hooks. Each specifies a live collection to slice along with a non-determinism explanation, and returns the current slice. In this case:
request_batchis a batch of get requests (aKeyedStream<u32, ()>)count_snapshotis the current value of the counter at the time this batch is processed (aSingleton<usize>)
You'll notice the nondet! macro in each use statement. Hydro guarantees eventual determinism: given the same inputs, your program will eventually produce the same outputs. However, some operations involve inherent non-determinism, such the boundaries of a batch or which snapshot of asynchronous state is observed.
The nondet! macro serves two purposes:
- It marks points where non-determinism occurs, making them explicit in your code
- It requires you to document why the non-determinism is acceptable for your application
Code that involves nondet! calls for extra scrutiny during code review, as any non-determinism in your application will originate at one of these points. We will see in a moment that there is indeed a bug involved with the non-determinism here!
After declaring the use statements, which must come at the top of the macro invocation, the body of sliced! processes the data:
request_batch.cross_singleton(count_snapshot).map(q!(|(_, count)| count))
};
The cross_singleton method pairs each element in the keyed stream with the singleton value, creating tuples like ((), 5) for the values while preserving the original key. So if client 1 sends a get request, you'll have a keyed stream element with key 1 and value ((), 5).
Finally, you use map to transform just the value part of each keyed stream element, extracting the count from the tuple. Crucially, map on a keyed stream transforms only the values while keeping the keys unchanged. So map(q!(|(_, count)| count)) converts the value from ((), 5) to just 5, but the key (client ID) remains attached. This ensures that each client receives the appropriate responses.
Simulation Testing
Especially in code that involves nondet!, it is important to write simulation tests. When possible, you should write exhaustive tests, since they guarantee coverage of all possible asynchronous executions. Let's write a test for the counter service.
First, set up the test infrastructure. Because the simulator only supports Stream as inputs and outputs, you use .into_keyed() to convert the inputs into a KeyedStream and .entries() to convert the outputs back to a regular Stream.
mod tests {
use hydro_lang::prelude::*;
use super::*;
#[test]
fn test_counter_read_after_write() {
let flow = FlowBuilder::new();
let process = flow.process();
let external = flow.external::<()>();
let (inc_in_port, inc_requests) = process.source_external_bincode(&external);
let inc_requests = inc_requests.into_keyed();
let (get_in_port, get_requests) = process.source_external_bincode(&external);
let get_requests = get_requests.into_keyed();
let (inc_acks, get_responses) = single_counter_service(inc_requests, get_requests);
let inc_out_port = inc_acks.entries().send_bincode_external(&external);
let get_out_port = get_responses.entries().send_bincode_external(&external);
Now you can use Hydro's exhaustive simulator to test a scenario where a client increments the counter and then reads it back. This test sends an increment request from client 1, waits for the acknowledgement, then sends a get request and expects to receive a count of 1.
Because each group in a keyed stream has an independent order of elements, calling .entries() results in an unordered stream of tuples. Therefore, you cannot make assertions about the order of outputs, and must use the assert_yields_unordered method instead:
flow.sim().exhaustive(async |mut instance| {
let inc_in_port = instance.connect(&inc_in_port);
let get_in_port = instance.connect(&get_in_port);
let mut inc_out_port = instance.connect(&inc_out_port);
let get_out_port = instance.connect(&get_out_port);
instance.launch();
inc_in_port.send((1, ()));
inc_out_port.assert_yields_unordered([(1, ())]).await;
get_in_port.send((1, ()));
get_out_port.assert_yields_only_unordered([(1, 1)]).await;
});
}
If you run this test with cargo test -- single_counter, you'll see it fail! The simulator explores different execution orderings and finds one where the get request observes a count of 0 instead of 1:
Running Tick
| let request_batch = use(get_requests, nondet!(/** we never observe batch boundaries */));
| ^ releasing no items
| let count_snapshot = use(current_count, nondet!(/** intentional, based on when the request came in */));
| ^ releasing snapshot: 0
Running Tick
| let request_batch = use(get_requests, nondet!(/** we never observe batch boundaries */));
| ^ releasing items: [(1, ())]
| let count_snapshot = use(current_count, nondet!(/** intentional, based on when the request came in */));
| ^ releasing unchanged snapshot: 0
thread panicked at:
Stream yielded unexpected message: (1, 0)
What went wrong? The issue is that acknowledging the increment is asynchronous with respect to snapshotting the counter. Even though the test waits for the acknowledgement before sending the get request, the counter snapshot may be an older version (we see this with the "releasing unchanged snapshot" message).
When slicing a Singleton with use, the only guarantee is that each snapshot is at least the same version as the last snapshot. But this snapshot may lag behind, since changes to a singleton are propagated asynchronously.
Consistent Snapshots with Atomic
To fix this bug, you need to establish a consistency relationship between the acknowledgements and count snapshots. The problem is that when a client receives an acknowledgement, there's no guarantee about what count snapshot a subsequent get request will observe.
Calling atomic() on a stream (which requires a Tick parameter) enables downstream operations to establish consistency guarantees:
let atomic_tick = increment_requests.location().tick();
let increment_request_processing = increment_requests.atomic(&atomic_tick);
Instead of counting the original stream, you now count using the atomic stream. This produces an atomic singleton that will allow you to take consistent snapshots later. The APIs on an atomic live collection are the same as on a regular live collection, so the counter code is unchanged:
let current_count = increment_request_processing.clone().values().count();
Now, let's focus on the outputs of the counter service. It needs to release acknowledgements and process get requests, while ensuring that a read-after-write produces a consistent response:
let increment_ack = increment_request_processing.end_atomic();
let get_response = sliced! {
let request_batch = use(get_requests, nondet!(/** we never observe batch boundaries */));
let count_snapshot = use::atomic(current_count, nondet!(/** atomicity guarantees consistency wrt increments */));
request_batch.cross_singleton(count_snapshot).map(q!(|(_, count)| count))
};
The end_atomic() call is one half of the consistency relationship. It marks the point where the acknowledgements are released to the client. The other half is use::atomic, which snapshots the count in a way that is consistent with respect to those acknowledgements. The use::atomic ensures that the count snapshot reflects all increments whose acknowledgements have been released.
Run the test again with cargo test -- single_counter, and it should pass!
Next Steps
In the next sections, you will expand your single counter into a keyed counter service. You will add support for storing several keys, learning how to re-group streams and store keyed state. Then, to complete this tutorial series, you will partition your keyed store across a cluster of machines and learn how to abstract distributed components in Hydro.
The next steps of this tutorial are currently under development. In the meantime, you may want to look at the framework docs.