Skip to main content

A Single Counter

Now that you've explored the Hydro template and built a simple echo server, you'll create a more complex stateful service: a counter that can be incremented and queried. This will introduce you to key Hydro concepts for managing asynchronous state and local consistency.

In this section, you'll build a counter service for a single client that maintains state across requests. Along the way, you'll discover how Hydro's type system and simulation testing help you catch subtle distributed bugs before they reach production.

You will learn

  • How to use the Singleton live collection to store application state
  • How to use slices and nondet! markers to read snapshots of asynchronous state
  • How to test complex concurrent scenarios with exhaustive simulation tests
  • How to use atomic streams to enforce local consistency guarantees

A Simple Counter Service

Like the echo server, our counter service will handle requests from a single client. However, the counter service involves multiple request and response types:

  • Increment requests: Add 1 to the counter and acknowledge the operation
  • Get requests: Return the current count
info

Create a new file src/single_client_counter.rs and add mod single_client_counter; to your src/lib.rs.

The standard pattern in Hydro is to take in a separate parameter for each request type and return a separate stream for each output. Your counter service will take in two Stream parameters for increment and get requests and return a tuple of two Streams for the responses:

pub fn single_client_counter_service<'a>(
increment_requests: Stream<(), Process<'a, CounterServer>>,
get_requests: Stream<(), Process<'a, CounterServer>>,
) -> (
Stream<(), Process<'a, CounterServer>>, // increment acknowledgments
Stream<usize, Process<'a, CounterServer>>, // get responses
) {

Maintaining State with Singleton

To track the counter value, you need to accumulate state across incoming requests. Hydro provides the Singleton live collection for storing a single value that can be updated over time.

You can create a singleton by counting the number of increment requests:

let current_count = increment_requests.clone().count();

The count() method consumes the stream and returns a Singleton<usize> that tracks the total number of elements seen so far. Since you want to both count the requests and acknowledge them, you use clone() to create a copy of the stream.

For now, let's acknowledge increment requests immediately:

let increment_ack = increment_requests;

Asynchronous State and Slices

Now comes the interesting part: how do you respond to get requests with the current count? The challenge is that current_count is a singleton that updates asynchronously as increment requests arrive, while get_requests is a stream that arrives independently.

Hydro provides the sliced! macro for this purpose. It allows you to perform computations that rely on a version of several live collections at some point in time. The way this version is revealed depends on the type of live collection. A Stream will be revealed as a batch of new elements at that point in time, and a Singleton will be revealed as a snapshot of the state.

let get_response = sliced! {
let request_batch = use(get_requests, nondet!(/** we never observe batch boundaries */));
let count_snapshot = use(current_count, nondet!(/** intentional, based on when the request came in */));

The sliced! macro takes multiple use statements, using syntax inspired by React hooks. Each specifies a live collection to slice along with a non-determinism explanation, and returns the current slice. In this case:

  • request_batch is a batch of get requests (a Stream<()>)
  • count_snapshot is the current value of the counter at the time this batch is processed (a Singleton<usize>)
info

You'll notice the nondet! macro in each use statement. Hydro guarantees eventual determinism: given the same inputs, your program will eventually produce the same outputs. However, some operations involve inherent non-determinism, such the boundaries of a batch or which snapshot of asynchronous state is observed.

The nondet! macro serves two purposes:

  1. It marks points where non-determinism occurs, making them explicit in your code
  2. It requires you to document why the non-determinism is acceptable for your application

Code that involves nondet! calls for extra scrutiny during code review, as any non-determinism in your application will originate at one of these points. We will see in a moment that there is indeed a bug involved with the non-determinism here!

After declaring the use statements, which must come at the top of the macro invocation, the body of sliced! processes the data:

let count_snapshot = use::atomic(current_count, nondet!(/** atomicity guarantees consistency wrt increments */));

The cross_singleton method pairs each element in the stream with the singleton value, creating tuples like ((), 5). Then you use map to extract just the count from the tuple with map(q!(|(_, count)| count)), converting ((), 5) to just 5.

Simulation Testing

Especially in code that involves nondet!, it is important to write simulation tests. When possible, you should write exhaustive tests, since they guarantee coverage of all possible asynchronous executions. Let's write a test for the counter service.

#[test]
fn test_counter_read_after_write() {
let flow = FlowBuilder::new();
let process = flow.process();

let (inc_in_port, inc_requests) = process.sim_input();
let (get_in_port, get_requests) = process.sim_input();

let (inc_acks, get_responses) = single_client_counter_service(inc_requests, get_requests);

let inc_out_port = inc_acks.sim_output();
let get_out_port = get_responses.sim_output();

flow.sim().exhaustive(async || {
inc_in_port.send(());
inc_out_port.assert_yields([()]).await;
get_in_port.send(());
get_out_port.assert_yields_only([1]).await;
});
}

This test uses Hydro's exhaustive simulator to test a scenario where the client increments the counter and then reads it back. The test sends an increment request, waits for the acknowledgement, then sends a get request and expects to receive a count of 1.

If you run this test with cargo test -- single_client_counter, you'll see it fail! The simulator explores different execution orderings and finds one where the get request observes a count of 0 instead of 1:

Running Tick
| let request_batch = use(get_requests, nondet!(/** we never observe batch boundaries */));
| ^ releasing no items
| let count_snapshot = use(current_count, nondet!(/** intentional, based on when the request came in */));
| ^ releasing snapshot: 0

Running Tick
| let request_batch = use(get_requests, nondet!(/** we never observe batch boundaries */));
| ^ releasing items: [()]
| let count_snapshot = use(current_count, nondet!(/** intentional, based on when the request came in */));
| ^ releasing unchanged snapshot: 0

thread ... panicked at src/single_client_counter.rs:50:
Stream yielded unexpected message: 0

What went wrong? The issue is that acknowledging the increment is asynchronous with respect to snapshotting the counter. Even though the test waits for the acknowledgement before sending the get request, the counter snapshot may be an older version (we see this with the "releasing unchanged snapshot" message).

When slicing a Singleton with use, the only guarantee is that each snapshot is at least the same version as the last snapshot. But this snapshot may lag behind, since changes to a singleton are propagated asynchronously.

Consistent Snapshots with Atomic

To fix this bug, you need to establish a consistency relationship between the acknowledgements and count snapshots. The problem is that when a client receives an acknowledgement, there's no guarantee about what count snapshot a subsequent get request will observe.

Calling atomic() on a stream (which requires a Tick parameter) enables downstream operations to establish consistency guarantees:

let atomic_tick = increment_requests.location().tick();
let increment_request_processing = increment_requests.atomic(&atomic_tick);

Instead of counting the original stream, you now count using the atomic stream. This produces an atomic singleton that will allow you to take consistent snapshots later. The APIs on an atomic live collection are the same as on a regular live collection, so the counter code is unchanged:

let current_count = increment_request_processing.clone().count();

Now, let's focus on the outputs of the counter service. It needs to release acknowledgements and process get requests, while ensuring that a read-after-write produces a consistent response:

let increment_ack = increment_request_processing.end_atomic();

let get_response = sliced! {
let request_batch = use(get_requests, nondet!(/** we never observe batch boundaries */));
let count_snapshot = use::atomic(current_count, nondet!(/** atomicity guarantees consistency wrt increments */));

request_batch.cross_singleton(count_snapshot).map(q!(|(_, count)| count))
};

The end_atomic() call is one half of the consistency relationship. It marks the point where the acknowledgements are released to the client. The other half is use::atomic, which snapshots the count in a way that is consistent with respect to those acknowledgements. The use::atomic ensures that the count snapshot reflects all increments whose acknowledgements have been released.

Run the test again with cargo test -- single_client_counter, and it should pass!

Next Steps

In the next section, you'll extend the counter service to handle multiple concurrent clients, each with their own independent stream of requests. After that, you'll learn how to maintain separate counts for different keys using keyed state.