Keyed Streams
Keyed streams are a powerful abstraction for grouping stream elements by keys, enabling efficient parallel processing while maintaining ordering guarantees within each group. They represent streaming data partitioned into logical groups, where each group is identified by a key of type K
and contains values of type V
.
The key insight behind keyed streams is that while the order of keys (groups) may be non-deterministic, the order of elements within each group can maintain deterministic ordering. This allows for asynchronous processing across different keys while preserving important guarantees within each group.
Think of a keyed stream as a collection of independent sub-streams, each identified by a unique key. Messages with the same key are guaranteed to be processed in order relative to each other, but messages with different keys can be processed concurrently and may arrive in any order. This pattern is common in distributed systems where you need to process user sessions (keyed by user ID), handle transactions (keyed by account ID), aggregate metrics (keyed by metric name), or route messages (keyed by destination).
Any stream of tuples (K, V)
can be converted into a keyed stream using the into_keyed()
method:
let data = process.source_iter(q!(vec![
("user1", "login"),
("user2", "click"),
("user1", "purchase"),
("user2", "logout")
]));
let keyed: KeyedStream<&str, &str, _, _, _> = data.into_keyed();
The resulting keyed stream groups elements by their first tuple element (the key), while preserving the original ordering within each group.
Ordering and Determinism
Keyed streams are different from regular streams in that they capture a partial ordering of key-value pairs. Within each group, values may have a deterministic ordering, but there is no ordering of values across different groups. Just like streams, keyed stream have an Order
type parameter that indicates whether the elements in each group will have a deterministic order (TotalOrder
) or not (NoOrder
). When the type parameter is omitted, it defaults to TotalOrder
for brevity.
Keyed streams support familiar transformation operations like map
and filter
(full API) that operate on the values within each group, and thus preserve any ordering guarantees. Each key maintains its own independent processing pipeline - user1
events are transformed and filtered separately from user2
events:
let events = process.source_iter(q!(vec![
("user1", "login"),
("user2", "click"),
("user1", "purchase"),
("user2", "logout"),
("user1", "click")
])).into_keyed();
// Transform events to uppercase and filter for important actions
let important_events = events
.map(q!(|event| event.to_uppercase()))
.filter(q!(|event| event == "LOGIN" || event == "PURCHASE"));
important_events.entries()
// [("user1", "LOGIN"), ("user1", "PURCHASE")] in any order
Keyed streams are most commonly used when dealing with network requests originating from external clients or a cluster of machines. When receiving data, you will receive a KeyedStream<ID, Value, ...>
, which helps capture the determinism properties for each siyrce and the non-determinism across siyrce. Similarly, when sending data, you will want to emit a KeyedStream<ID, Value, ...>
to capture the independent streams of outputs going to each destination. For more on these network APIs, see Locations / Clusters / Networking.
Flattening Operations
Keyed streams provide two methods to convert back to regular streams. The .entries()
method returns a stream of (K, V)
tuples, while .values()
returns a stream of just the values, discarding keys. Note that both of these APIs return a stream with NoOrder
ordering, because they interleave elements across key groups in non-deterministic order. This means that downstream logic must tolerate this unknown interleaving of elements appropriately.
let keyed = process
.source_iter(q!(vec![("A", 1), ("B", 2), ("A", 3)]))
.into_keyed();
let entries = keyed.clone().entries(); // Stream<(K, V), _, _, NoOrder>
// [("A", 1), ("B", 2), ("A", 3)] in any order
let values = keyed.values(); // Stream<V, _, _, NoOrder>
// [1, 2, 3] in any order
Aggregation
One of the most powerful features of keyed streams is the ability to perform aggregations within each key group. The fold
operation accumulates values for each key independently, maintaining separate state for each key while preserving ordering guarantees within each group:
let purchases = process.source_iter(q!(vec![
("alice", 10),
("bob", 5),
("alice", 15),
("bob", 8),
("alice", 3)
])).into_keyed();
// Batch the stream and calculate total spending per user
purchases.fold(q!(|| 0), q!(|acc, amount| *acc += amount))
// { "alice": 28, "bob": 13 }
Keyed streams support different fold variants based on ordering and retry guarantees. The basic fold
requires TotalOrder
within groups for order-dependent aggregation, while fold_commutative
works with NoOrder
and requires commutative operations. For handling potential duplicates, fold_idempotent
requires idempotent operations, and fold_commutative_idempotent
is the most flexible, handling both unordered and duplicate messages:
let tick = process.tick();
let health_checks = process.source_iter(q!(vec![
("server1", false),
("server2", false),
("server1", true), // had failure
])).into_keyed();
// Detect a failure (commutative and idempotent)
health_checks.fold_commutative_idempotent(
q!(|| false),
q!(|acc, check| *acc |= check)
)
// { "server1": true, "server2": false }
This makes keyed streams particularly well-suited for distributed aggregations where you need to handle the realities of network communication - messages may arrive out of order or be duplicated, but you still need correct results.