Atomic Collections
When building stateful services, you often need to ensure consistency between different outputs. For example, in a counter service, when a client sends an increment request and receives an acknowledgement, a subsequent get request from that client should observe the incremented count.
Without special handling, this guarantee doesn't hold automatically. The acknowledgement might be sent before the count update is visible to get requests, leading to stale reads.
The Problem: Asynchronous State Updates
Consider a simple counter service that handles increment and get requests:
let current_count = increment_requests.clone().count();
let increment_ack = increment_requests.map(q!(|_| "ok"));
let get_response = sliced! {
let request_batch = use(get_requests, nondet!(/** ... */));
let count_snapshot = use(current_count, nondet!(/** ... */));
request_batch.cross_singleton(count_snapshot)
};
This code has a subtle bug. The increment_ack stream releases acknowledgements as soon as increment requests arrive. But the count_snapshot in sliced! might observe an older version of the count—one that doesn't include the increment that was just acknowledged.
The animation shows the problem: an increment request arrives, the acknowledgement is sent immediately, but when a get request arrives, it observes a stale snapshot of the count (0 instead of 1).
Atomic Collections
Hydro provides atomic collections to establish consistency relationships between outputs and state snapshots. By marking a live collection as atomic, you create a synchronization point that downstream operations can reference.
Atomic collections work by establishing a happens-before relationship:
- When you call
atomic(&tick)on a live collection, all downstream computations are associated with that tick - When you call
end_atomic(), the outputs are held until all computations in the tick complete - When you use
use::atomicin asliced!block, the snapshot is guaranteed to reflect all updates from associatedend_atomic
This means:
- If a client receives an acknowledgement from
end_atomic() - Any subsequent
use::atomicsnapshot will include the effects of that acknowledged operation
let atomic_tick = process.tick();
let increment_request_processing = increment_requests.atomic(&atomic_tick);
let current_count = increment_request_processing.clone().count();
let increment_ack = increment_request_processing.end_atomic();
The key changes are:
atomic(&tick): Marks the live collection as atomic, associating it with a tickend_atomic(): Releases the acknowledgements and establishes a consistency boundary
Now, when you snapshot the count with use::atomic, you're guaranteed to observe a version that's consistent with the released acknowledgements:
let get_response = sliced! {
let request_batch = use(get_requests, nondet!(/** ... */));
let count_snapshot = use::atomic(current_count, nondet!(/** ... */));
request_batch.cross_singleton(count_snapshot)
};
The animation shows the corrected behavior: the increment is processed, the count is updated, and only then is the acknowledgement released. When the get request arrives, it observes the updated count.
Atomic vs Non-Atomic Snapshots
The sliced! macro supports two styles of snapshots:
| Style | Syntax | Guarantee |
|---|---|---|
| Non-atomic | use(collection, nondet!(...)) | Snapshot is at least as recent as the previous snapshot |
| Atomic | use::atomic(collection, nondet!(...)) | Same as above and reflects all updates released by end_atomic |
Use non-atomic snapshots when:
- You don't need read-after-write consistency
- The snapshot is for informational purposes (logging, metrics)
- Stale reads are acceptable for your use case
Use atomic snapshots when:
- Clients expect read-after-write consistency
- You're implementing transactions or other consistency protocols
- Correctness depends on observing the effects of acknowledged operations
Testing Atomicity
The Hydro simulator is essential for testing atomic code. It explores different execution orderings to find bugs where atomicity guarantees are violated:
#[test]
fn test_read_after_write() {
let mut flow = FlowBuilder::new();
let process = flow.process::<()>();
// ... setup code ...
flow.sim().exhaustive(|sim| async move {
// Send increment
increment_input.send(()).await;
// Wait for acknowledgement
increment_ack_output.assert_yields(()).await;
// Get should observe the increment
get_input.send(()).await;
get_output.assert_yields(1).await;
});
}
If your code has an atomicity bug, the simulator will find an execution where the get request observes a stale count, causing the test to fail.
Limitations
Atomic collections have important limitations that require special care:
-
Single location only: Atomic guarantees only apply within a single process or cluster member. They cannot span multiple locations.
-
No distributed atomicity: If you need atomicity across multiple machines, you must implement a distributed coordination protocol (like two-phase commit or Paxos).
-
Performance implications: Atomic collections introduce synchronization points that can affect throughput. Use them only when consistency is required.