pub struct KeyedStream<K, V, Loc, Bound: Boundedness, Order: Ordering = TotalOrder, Retry: Retries = ExactlyOnce> { /* private fields */ }
Expand description
Streaming elements of type V
grouped by a key of type K
.
Keyed Streams capture streaming elements of type V
grouped by a key of type K
, where the
order of keys is non-deterministic but the order within each group may be deterministic.
Although keyed streams are conceptually grouped by keys, values are not immediately grouped
into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
operator such as KeyedStream::fold
is called, which requires K: Hash + Eq
.
Type Parameters:
K
: the type of the key for each groupV
: the type of the elements inside each groupLoc
: theLocation
where the keyed stream is materializedBound
: tracks whether the entries areBounded
(local and finite) orUnbounded
(asynchronous and possibly infinite)Order
: tracks whether the elements within each group have deterministic order (TotalOrder
) or not (NoOrder
)Retries
: tracks whether the elements within each group have deterministic cardinality (ExactlyOnce
) or may have non-deterministic retries (crate::live_collections::stream::AtLeastOnce
)
Implementations§
Source§impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries> KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R>
impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries> KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R>
Sourcepub fn demux_bincode(
self,
other: &Cluster<'a, L2>,
) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>where
T: Serialize + DeserializeOwned,
pub fn demux_bincode(
self,
other: &Cluster<'a, L2>,
) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>where
T: Serialize + DeserializeOwned,
Sends each group of this stream to a specific member of a cluster, with the MemberId
key
identifying the recipient for each group and using bincode
to serialize/deserialize messages.
Each key must be a MemberId<L2>
and each value must be a T
where the key specifies
which cluster member should receive the data. Unlike Stream::broadcast_bincode
, this
API allows precise targeting of specific cluster members rather than broadcasting to
all members.
§Example
let p1 = flow.process::<()>();
let workers: Cluster<()> = flow.cluster::<()>();
let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
let on_worker: Stream<_, Cluster<_>, _> = numbers
.map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)))
.into_keyed()
.demux_bincode(&workers);
// if there are 4 members in the cluster, each receives one element
// - MemberId::<()>(0): [0]
// - MemberId::<()>(1): [1]
// - MemberId::<()>(2): [2]
// - MemberId::<()>(3): [3]
Source§impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries> KeyedStream<MemberId<L2>, T, Cluster<'a, L>, B, O, R>
impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries> KeyedStream<MemberId<L2>, T, Cluster<'a, L>, B, O, R>
Sourcepub fn demux_bincode(
self,
other: &Cluster<'a, L2>,
) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>where
T: Serialize + DeserializeOwned,
pub fn demux_bincode(
self,
other: &Cluster<'a, L2>,
) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>where
T: Serialize + DeserializeOwned,
Sends each group of this stream at each source member to a specific member of a destination
cluster, with the MemberId
key identifying the recipient for each group and using
bincode
to serialize/deserialize messages.
Each key must be a MemberId<L2>
and each value must be a T
where the key specifies
which cluster member should receive the data. Unlike Stream::broadcast_bincode
, this
API allows precise targeting of specific cluster members rather than broadcasting to all
members.
Each cluster member sends its local stream elements, and they are collected at each
destination member as a KeyedStream
where keys identify the source cluster member.
§Example
let source: Cluster<Source> = flow.cluster::<Source>();
let to_send: KeyedStream<_, _, Cluster<_>, _> = source
.source_iter(q!(vec![0, 1, 2, 3]))
.map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)))
.into_keyed();
let destination: Cluster<Destination> = flow.cluster::<Destination>();
let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
// if there are 4 members in the destination cluster, each receives one message from each source member
// - Destination(0): { Source(0): [0], Source(1): [0], ... }
// - Destination(1): { Source(0): [1], Source(1): [1], ... }
// - ...
Source§impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
Sourcepub fn assume_ordering<O2: Ordering>(
self,
_nondet: NonDet,
) -> KeyedStream<K, V, L, B, O2, R>
pub fn assume_ordering<O2: Ordering>( self, _nondet: NonDet, ) -> KeyedStream<K, V, L, B, O2, R>
Explicitly “casts” the keyed stream to a type with a different ordering guarantee for each group. Useful in unsafe code where the ordering cannot be proven by the type-system.
§Non-Determinism
This function is used as an escape hatch, and any mistakes in the provided ordering guarantee will propagate into the guarantees for the rest of the program.
Sourcepub fn assume_retries<R2: Retries>(
self,
nondet: NonDet,
) -> KeyedStream<K, V, L, B, O, R2>
pub fn assume_retries<R2: Retries>( self, nondet: NonDet, ) -> KeyedStream<K, V, L, B, O, R2>
Explicitly “casts” the keyed stream to a type with a different retries guarantee for each group. Useful in unsafe code where the lack of retries cannot be proven by the type-system.
§Non-Determinism
This function is used as an escape hatch, and any mistakes in the provided retries guarantee will propagate into the guarantees for the rest of the program.
Sourcepub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R>
pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R>
Flattens the keyed stream into an unordered stream of key-value pairs.
§Example
process
.source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
.into_keyed()
.entries()
// (1, 2), (1, 3), (2, 4) in any order
Sourcepub fn values(self) -> Stream<V, L, B, NoOrder, R>
pub fn values(self) -> Stream<V, L, B, NoOrder, R>
Flattens the keyed stream into an unordered stream of only the values.
§Example
process
.source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
.into_keyed()
.values()
// 2, 3, 4 in any order
Sourcepub fn map<U, F>(
self,
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedStream<K, U, L, B, O, R>where
F: Fn(V) -> U + 'a,
pub fn map<U, F>(
self,
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedStream<K, U, L, B, O, R>where
F: Fn(V) -> U + 'a,
Transforms each value by invoking f
on each element, with keys staying the same
after transformation. If you need access to the key, see KeyedStream::map_with_key
.
If you do not want to modify the stream and instead only want to view
each item use KeyedStream::inspect
instead.
§Example
process
.source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
.into_keyed()
.map(q!(|v| v + 1))
// { 1: [3, 4], 2: [5] }
Sourcepub fn map_with_key<U, F>(
self,
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedStream<K, U, L, B, O, R>
pub fn map_with_key<U, F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, U, L, B, O, R>
Transforms each value by invoking f
on each key-value pair. The resulting values are not
re-grouped even they are tuples; instead they will be grouped under the original key.
If you do not want to modify the stream and instead only want to view
each item use KeyedStream::inspect_with_key
instead.
§Example
process
.source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
.into_keyed()
.map_with_key(q!(|(k, v)| k + v))
// { 1: [3, 4], 2: [6] }
Sourcepub fn filter<F>(
self,
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedStream<K, V, L, B, O, R>
pub fn filter<F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, V, L, B, O, R>
Creates a stream containing only the elements of each group stream that satisfy a predicate
f
, preserving the order of the elements within the group.
The closure f
receives a reference &V
rather than an owned value v
because filtering does
not modify or take ownership of the values. If you need to modify the values while filtering
use KeyedStream::filter_map
instead.
§Example
process
.source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
.into_keyed()
.filter(q!(|&x| x > 2))
// { 1: [3], 2: [4] }
Sourcepub fn filter_with_key<F>(
self,
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedStream<K, V, L, B, O, R>
pub fn filter_with_key<F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, V, L, B, O, R>
Creates a stream containing only the elements of each group stream that satisfy a predicate
f
(which receives the key-value tuple), preserving the order of the elements within the group.
The closure f
receives a reference &(K, V)
rather than an owned value (K, V)
because filtering does
not modify or take ownership of the values. If you need to modify the values while filtering
use KeyedStream::filter_map_with_key
instead.
§Example
process
.source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
.into_keyed()
.filter_with_key(q!(|&(k, v)| v - k == 2))
// { 1: [3], 2: [4] }
Sourcepub fn filter_map<U, F>(
self,
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedStream<K, U, L, B, O, R>
pub fn filter_map<U, F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, U, L, B, O, R>
An operator that both filters and maps each value, with keys staying the same.
It yields only the items for which the supplied closure f
returns Some(value)
.
If you need access to the key, see KeyedStream::filter_map_with_key
.
§Example
process
.source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
.into_keyed()
.filter_map(q!(|s| s.parse::<usize>().ok()))
// { 1: [2], 2: [4] }
Sourcepub fn filter_map_with_key<U, F>(
self,
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedStream<K, U, L, B, O, R>
pub fn filter_map_with_key<U, F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, U, L, B, O, R>
An operator that both filters and maps each key-value pair. The resulting values are not
re-grouped even they are tuples; instead they will be grouped under the original key.
It yields only the items for which the supplied closure f
returns Some(value)
.
§Example
process
.source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
.into_keyed()
.filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
// { 2: [2] }
Sourcepub fn flat_map_ordered<U, I, F>(
self,
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedStream<K, U, L, B, O, R>
pub fn flat_map_ordered<U, I, F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, U, L, B, O, R>
For each value v
in each group, transform v
using f
and then treat the
result as an Iterator
to produce values one by one within the same group.
The implementation for Iterator
for the output type I
must produce items
in a deterministic order.
For example, I
could be a Vec
, but not a HashSet
. If the order of the items in I
is
not deterministic, use KeyedStream::flat_map_unordered
instead.
§Example
process
.source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
.into_keyed()
.flat_map_ordered(q!(|x| x))
// { 1: [2, 3, 4], 2: [5, 6] }
Sourcepub fn flat_map_unordered<U, I, F>(
self,
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedStream<K, U, L, B, NoOrder, R>
pub fn flat_map_unordered<U, I, F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, U, L, B, NoOrder, R>
Like KeyedStream::flat_map_ordered
, but allows the implementation of Iterator
for the output type I
to produce items in any order.
§Example
process
.source_iter(q!(vec![
(1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
(2, std::collections::HashSet::from_iter(vec![4, 5]))
]))
.into_keyed()
.flat_map_unordered(q!(|x| x))
// { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
Sourcepub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>where
V: IntoIterator<Item = U>,
K: Clone,
pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>where
V: IntoIterator<Item = U>,
K: Clone,
For each value v
in each group, treat v
as an Iterator
and produce its items one by one
within the same group. The implementation for Iterator
for the value type V
must produce
items in a deterministic order.
For example, V
could be a Vec
, but not a HashSet
. If the order of the items in V
is
not deterministic, use KeyedStream::flatten_unordered
instead.
§Example
process
.source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
.into_keyed()
.flatten_ordered()
// { 1: [2, 3, 4], 2: [5, 6] }
Sourcepub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>where
V: IntoIterator<Item = U>,
K: Clone,
pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>where
V: IntoIterator<Item = U>,
K: Clone,
Like KeyedStream::flatten_ordered
, but allows the implementation of Iterator
for the value type V
to produce items in any order.
§Example
process
.source_iter(q!(vec![
(1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
(2, std::collections::HashSet::from_iter(vec![4, 5]))
]))
.into_keyed()
.flatten_unordered()
// { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
Sourcepub fn inspect<F>(
self,
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedStream<K, V, L, B, O, R>
pub fn inspect<F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, V, L, B, O, R>
An operator which allows you to “inspect” each element of a stream without
modifying it. The closure f
is called on a reference to each value. This is
mainly useful for debugging, and should not be used to generate side-effects.
§Example
process
.source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
.into_keyed()
.inspect(q!(|v| println!("{}", v)))
Sourcepub fn inspect_with_key<F>(
self,
f: impl IntoQuotedMut<'a, F, L>,
) -> KeyedStream<K, V, L, B, O, R>
pub fn inspect_with_key<F>( self, f: impl IntoQuotedMut<'a, F, L>, ) -> KeyedStream<K, V, L, B, O, R>
An operator which allows you to “inspect” each element of a stream without
modifying it. The closure f
is called on a reference to each key-value pair. This is
mainly useful for debugging, and should not be used to generate side-effects.
§Example
process
.source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
.into_keyed()
.inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
Sourcepub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R>
pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R>
An operator which allows you to “name” a HydroNode
.
This is only used for testing, to correlate certain HydroNode
s with IDs.
Source§impl<'a, K, V, L: Location<'a> + NoTick + NoAtomic, O: Ordering, R: Retries> KeyedStream<K, V, L, Unbounded, O, R>
impl<'a, K, V, L: Location<'a> + NoTick + NoAtomic, O: Ordering, R: Retries> KeyedStream<K, V, L, Unbounded, O, R>
Sourcepub fn interleave<O2: Ordering, R2: Retries>(
self,
other: KeyedStream<K, V, L, Unbounded, O2, R2>,
) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>where
R: MinRetries<R2>,
pub fn interleave<O2: Ordering, R2: Retries>(
self,
other: KeyedStream<K, V, L, Unbounded, O2, R2>,
) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>where
R: MinRetries<R2>,
Produces a new keyed stream that “merges” the inputs by interleaving the elements
of any overlapping groups. The result has NoOrder
on each group because the
order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
the ordering will be deterministic and you can safely use Self::assume_ordering
.
Currently, both input streams must be Unbounded
.
§Example
let numbers1 = process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed();
let numbers2 = process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed();
numbers1.interleave(numbers2)
// { 1: [2, 3], 3: [4, 5] } with each group in unknown order
Source§impl<'a, K, V, L, B: Boundedness> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
impl<'a, K, V, L, B: Boundedness> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
Sourcepub fn scan<A, U, I, F>(
self,
init: impl IntoQuotedMut<'a, I, L> + Copy,
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
pub fn scan<A, U, I, F>( self, init: impl IntoQuotedMut<'a, I, L> + Copy, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
A special case of Stream::scan
for keyed streams. For each key group the values are transformed via the f
combinator.
Unlike Stream::fold_keyed
which only returns the final accumulated value, scan
produces a new stream
containing all intermediate accumulated values paired with the key. The scan operation can also terminate
early by returning None
.
The function takes a mutable reference to the accumulator and the current element, and returns
an Option<U>
. If the function returns Some(value)
, value
is emitted to the output stream.
If the function returns None
, the stream is terminated and no more elements are processed.
§Example
process
.source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
.into_keyed()
.scan(
q!(|| 0),
q!(|acc, x| {
*acc += x;
if *acc % 2 == 0 { None } else { Some(*acc) }
}),
)
// Output: { 0: [1], 1: [3, 7] }
Sourcepub fn generator<A, U, I, F>(
self,
init: impl IntoQuotedMut<'a, I, L> + Copy,
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
pub fn generator<A, U, I, F>( self, init: impl IntoQuotedMut<'a, I, L> + Copy, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
Iteratively processes the elements in each group using a state machine that can yield elements as it processes its inputs. This is designed to mirror the unstable generator syntax in Rust, without requiring special syntax.
Like KeyedStream::scan
, this function takes in an initializer that emits the initial
state for each group. The second argument defines the processing logic, taking in a
mutable reference to the group’s state and the value to be processed. It emits a
Generate
value, whose variants define what is emitted and whether further inputs
should be processed.
§Example
process
.source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
.into_keyed()
.generator(
q!(|| 0),
q!(|acc, x| {
*acc += x;
if *acc > 100 {
hydro_lang::live_collections::keyed_stream::Generate::Return(
"done!".to_string()
)
} else if *acc % 2 == 0 {
hydro_lang::live_collections::keyed_stream::Generate::Yield(
"even".to_string()
)
} else {
hydro_lang::live_collections::keyed_stream::Generate::Continue
}
}),
)
// Output: { 0: ["even", "done!"], 1: ["even"] }
Sourcepub fn fold_early_stop<A, I, F>(
self,
init: impl IntoQuotedMut<'a, I, L> + Copy,
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
pub fn fold_early_stop<A, I, F>( self, init: impl IntoQuotedMut<'a, I, L> + Copy, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
A variant of Stream::fold
, intended for keyed streams. The aggregation is executed
in-order across the values in each group. But the aggregation function returns a boolean,
which when true indicates that the aggregated result is complete and can be released to
downstream computation. Unlike Stream::fold_keyed
, this means that even if the input
stream is super::boundedness::Unbounded
, the outputs of the fold can be processed like
normal stream elements.
§Example
process
.source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
.into_keyed()
.fold_early_stop(
q!(|| 0),
q!(|acc, x| {
*acc += x;
x % 2 == 0
}),
)
// Output: { 0: 2, 1: 9 }
Sourcepub fn first(self) -> KeyedSingleton<K, V, L, B::WhenValueBounded>where
K: Clone,
pub fn first(self) -> KeyedSingleton<K, V, L, B::WhenValueBounded>where
K: Clone,
Gets the first element inside each group of values as a KeyedSingleton
that preserves
the original group keys. Requires the input stream to have TotalOrder
guarantees,
otherwise the first element would be non-deterministic.
§Example
process
.source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
.into_keyed()
.first()
// Output: { 0: 2, 1: 3 }
Sourcepub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
self,
init: impl IntoQuotedMut<'a, I, L>,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>( self, init: impl IntoQuotedMut<'a, I, L>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
Like Stream::fold
, aggregates the values in each group via the comb
closure.
Each group must have a TotalOrder
guarantee, which means that the comb
closure is allowed
to depend on the order of elements in the group.
If the input and output value types are the same and do not require initialization then use
KeyedStream::reduce
.
§Example
let tick = process.tick();
let numbers = process
.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
.into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.fold(q!(|| 0), q!(|acc, x| *acc += x))
.entries()
.all_ticks()
// (1, 5), (2, 7)
Sourcepub fn reduce<F: Fn(&mut V, V) + 'a>(
self,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
pub fn reduce<F: Fn(&mut V, V) + 'a>( self, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
Like Stream::reduce
, aggregates the values in each group via the comb
closure.
Each group must have a TotalOrder
guarantee, which means that the comb
closure is allowed
to depend on the order of elements in the stream.
If you need the accumulated value to have a different type than the input, use KeyedStream::fold
.
§Example
let tick = process.tick();
let numbers = process
.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
.into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch.reduce(q!(|acc, x| *acc += x)).entries().all_ticks()
// (1, 5), (2, 7)
Sourcepub fn reduce_watermark<O, F>(
self,
other: impl Into<Optional<O, Tick<L::Root>, Bounded>>,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
pub fn reduce_watermark<O, F>( self, other: impl Into<Optional<O, Tick<L::Root>, Bounded>>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
A special case of KeyedStream::reduce
where tuples with keys less than the watermark are automatically deleted.
Each group must have a TotalOrder
guarantee, which means that the comb
closure is allowed
to depend on the order of elements in the stream.
§Example
let tick = process.tick();
let watermark = tick.singleton(q!(1));
let numbers = process
.source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
.into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.reduce_watermark(watermark, q!(|acc, x| *acc += x))
.entries()
.all_ticks()
// (2, 204)
Source§impl<'a, K, V, L, B: Boundedness, O: Ordering> KeyedStream<K, V, L, B, O, ExactlyOnce>
impl<'a, K, V, L, B: Boundedness, O: Ordering> KeyedStream<K, V, L, B, O, ExactlyOnce>
Sourcepub fn fold_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
self,
init: impl IntoQuotedMut<'a, I, L>,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
pub fn fold_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>( self, init: impl IntoQuotedMut<'a, I, L>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
Like Stream::fold_commutative
, aggregates the values in each group via the comb
closure.
The comb
closure must be commutative, as the order of input items is not guaranteed.
If the input and output value types are the same and do not require initialization then use
KeyedStream::reduce_commutative
.
§Example
let tick = process.tick();
let numbers = process
.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
.into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
.entries()
.all_ticks()
// (1, 5), (2, 7)
Sourcepub fn reduce_commutative<F: Fn(&mut V, V) + 'a>(
self,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
pub fn reduce_commutative<F: Fn(&mut V, V) + 'a>( self, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
Like Stream::reduce_commutative
, aggregates the values in each group via the comb
closure.
The comb
closure must be commutative, as the order of input items is not guaranteed.
If you need the accumulated value to have a different type than the input, use KeyedStream::fold_commutative
.
§Example
let tick = process.tick();
let numbers = process
.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
.into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.reduce_commutative(q!(|acc, x| *acc += x))
.entries()
.all_ticks()
// (1, 5), (2, 7)
Sourcepub fn reduce_watermark_commutative<O2, F>(
self,
other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
pub fn reduce_watermark_commutative<O2, F>( self, other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
A special case of KeyedStream::reduce_commutative
where tuples with keys less than the watermark are automatically deleted.
The comb
closure must be commutative, as the order of input items is not guaranteed.
§Example
let tick = process.tick();
let watermark = tick.singleton(q!(1));
let numbers = process
.source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
.into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.reduce_watermark_commutative(watermark, q!(|acc, x| *acc += x))
.entries()
.all_ticks()
// (2, 204)
Source§impl<'a, K, V, L, B: Boundedness, R: Retries> KeyedStream<K, V, L, B, TotalOrder, R>
impl<'a, K, V, L, B: Boundedness, R: Retries> KeyedStream<K, V, L, B, TotalOrder, R>
Sourcepub fn fold_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
self,
init: impl IntoQuotedMut<'a, I, L>,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
pub fn fold_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>( self, init: impl IntoQuotedMut<'a, I, L>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
Like Stream::fold_idempotent
, aggregates the values in each group via the comb
closure.
The comb
closure must be idempotent as there may be non-deterministic duplicates.
If the input and output value types are the same and do not require initialization then use
KeyedStream::reduce_idempotent
.
§Example
let tick = process.tick();
let numbers = process
.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
.into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
.entries()
.all_ticks()
// (1, false), (2, true)
Sourcepub fn reduce_idempotent<F: Fn(&mut V, V) + 'a>(
self,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
pub fn reduce_idempotent<F: Fn(&mut V, V) + 'a>( self, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
Like Stream::reduce_idempotent
, aggregates the values in each group via the comb
closure.
The comb
closure must be idempotent, as there may be non-deterministic duplicates.
If you need the accumulated value to have a different type than the input, use KeyedStream::fold_idempotent
.
§Example
let tick = process.tick();
let numbers = process
.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
.into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.reduce_idempotent(q!(|acc, x| *acc |= x))
.entries()
.all_ticks()
// (1, false), (2, true)
Sourcepub fn reduce_watermark_idempotent<O2, F>(
self,
other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
pub fn reduce_watermark_idempotent<O2, F>( self, other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
A special case of KeyedStream::reduce_idempotent
where tuples with keys less than the watermark are automatically deleted.
The comb
closure must be idempotent, as there may be non-deterministic duplicates.
§Example
let tick = process.tick();
let watermark = tick.singleton(q!(1));
let numbers = process
.source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
.into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.reduce_watermark_idempotent(watermark, q!(|acc, x| *acc |= x))
.entries()
.all_ticks()
// (2, true)
Source§impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
Sourcepub fn fold_commutative_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
self,
init: impl IntoQuotedMut<'a, I, L>,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
pub fn fold_commutative_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>( self, init: impl IntoQuotedMut<'a, I, L>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
Like Stream::fold_commutative_idempotent
, aggregates the values in each group via the comb
closure.
The comb
closure must be commutative, as the order of input items is not guaranteed, and idempotent,
as there may be non-deterministic duplicates.
If the input and output value types are the same and do not require initialization then use
KeyedStream::reduce_commutative_idempotent
.
§Example
let tick = process.tick();
let numbers = process
.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
.into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
.entries()
.all_ticks()
// (1, false), (2, true)
Sourcepub fn reduce_commutative_idempotent<F: Fn(&mut V, V) + 'a>(
self,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
pub fn reduce_commutative_idempotent<F: Fn(&mut V, V) + 'a>( self, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
Like Stream::reduce_commutative_idempotent
, aggregates the values in each group via the comb
closure.
The comb
closure must be commutative, as the order of input items is not guaranteed, and idempotent,
as there may be non-deterministic duplicates.
If you need the accumulated value to have a different type than the input, use KeyedStream::fold_commutative_idempotent
.
§Example
let tick = process.tick();
let numbers = process
.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
.into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
.entries()
.all_ticks()
// (1, false), (2, true)
Sourcepub fn reduce_watermark_commutative_idempotent<O2, F>(
self,
other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
pub fn reduce_watermark_commutative_idempotent<O2, F>( self, other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
A special case of Stream::reduce_keyed_commutative_idempotent
where tuples with keys less than the watermark are automatically deleted.
The comb
closure must be commutative, as the order of input items is not guaranteed, and idempotent,
as there may be non-deterministic duplicates.
§Example
let tick = process.tick();
let watermark = tick.singleton(q!(1));
let numbers = process
.source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
.into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.reduce_watermark_commutative_idempotent(watermark, q!(|acc, x| *acc |= x))
.entries()
.all_ticks()
// (2, true)
Sourcepub fn filter_key_not_in<O2: Ordering, R2: Retries>(
self,
other: Stream<K, L, Bounded, O2, R2>,
) -> Self
pub fn filter_key_not_in<O2: Ordering, R2: Retries>( self, other: Stream<K, L, Bounded, O2, R2>, ) -> Self
Given a bounded stream of keys K
, returns a new keyed stream containing only the groups
whose keys are not in the bounded stream.
§Example
let tick = process.tick();
let keyed_stream = process
.source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
.batch(&tick, nondet!(/** test */))
.into_keyed();
let keys_to_remove = process
.source_iter(q!(vec![1, 2]))
.batch(&tick, nondet!(/** test */));
keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
// { 3: ['c'], 4: ['d'] }
Source§impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
Sourcepub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R>
pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R>
Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
will all be executed synchronously before any outputs are yielded (in KeyedStream::end_atomic
).
This is useful to enforce local consistency constraints, such as ensuring that a write is
processed before an acknowledgement is emitted. Entering an atomic section requires a Tick
argument that declares where the stream will be atomically processed. Batching a stream into
the same Tick
will preserve the synchronous execution, while batching into a different
Tick
will introduce asynchrony.
Sourcepub fn batch(
self,
tick: &Tick<L>,
nondet: NonDet,
) -> KeyedStream<K, V, Tick<L>, Bounded, O, R>
pub fn batch( self, tick: &Tick<L>, nondet: NonDet, ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R>
Given a tick, returns a keyed stream corresponding to a batch of elements segmented by that tick. These batches are guaranteed to be contiguous across ticks and preserve the order of the input.
§Non-Determinism
The batch boundaries are non-deterministic and may change across executions.
Source§impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
Sourcepub fn batch(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R>
pub fn batch(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R>
Returns a keyed stream corresponding to the latest batch of elements being atomically
processed. These batches are guaranteed to be contiguous across ticks and preserve
the order of the input. The output keyed stream will execute in the Tick
that was
used to create the atomic section.
§Non-Determinism
The batch boundaries are non-deterministic and may change across executions.
Sourcepub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R>
pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R>
Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
See KeyedStream::atomic
for more details.
Source§impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, L, Bounded, O, R>where
L: Location<'a>,
impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, L, Bounded, O, R>where
L: Location<'a>,
Sourcepub fn chain<O2: Ordering>(
self,
other: KeyedStream<K, V, L, Bounded, O2, R>,
) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, R>where
O: MinOrder<O2>,
pub fn chain<O2: Ordering>(
self,
other: KeyedStream<K, V, L, Bounded, O2, R>,
) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, R>where
O: MinOrder<O2>,
Produces a new keyed stream that combines the groups of the inputs by first emitting the
elements of the self
stream, and then emits the elements of the other
stream (if a key
is only present in one of the inputs, its values are passed through as-is). The output has
a TotalOrder
guarantee if and only if both inputs have a TotalOrder
guarantee.
Currently, both input streams must be Bounded
. This operator will block
on the first stream until all its elements are available. In a future version,
we will relax the requirement on the other
stream.
§Example
let tick = process.tick();
let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
// { 0: [2, 1], 1: [4, 3] }
Source§impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>where
L: Location<'a>,
impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>where
L: Location<'a>,
Sourcepub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R>
pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R>
Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream, which will stream all the elements across all tick iterations by concatenating the batches for each key.
Sourcepub fn all_ticks_atomic(self) -> KeyedStream<K, V, L, Unbounded, O, R>
pub fn all_ticks_atomic(self) -> KeyedStream<K, V, L, Unbounded, O, R>
Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream, which will stream all the elements across all tick iterations by concatenating the batches for each key.
Unlike KeyedStream::all_ticks
, this preserves synchronous execution, as the output stream
is emitted in an Atomic
context that will process elements synchronously with the input
stream’s Tick
context.
pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R>
Trait Implementations§
Source§impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries> Clone for KeyedStream<K, V, Loc, Bound, Order, R>
impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries> Clone for KeyedStream<K, V, Loc, Bound, Order, R>
Source§impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>> for KeyedStream<K, V, L, B, NoOrder, R>where
L: Location<'a>,
impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>> for KeyedStream<K, V, L, B, NoOrder, R>where
L: Location<'a>,
Source§fn from(
stream: KeyedStream<K, V, L, B, TotalOrder, R>,
) -> KeyedStream<K, V, L, B, NoOrder, R>
fn from( stream: KeyedStream<K, V, L, B, TotalOrder, R>, ) -> KeyedStream<K, V, L, B, NoOrder, R>
Auto Trait Implementations§
impl<K, V, Loc, Bound, Order = TotalOrder, Retry = ExactlyOnce> !Freeze for KeyedStream<K, V, Loc, Bound, Order, Retry>
impl<K, V, Loc, Bound, Order = TotalOrder, Retry = ExactlyOnce> !RefUnwindSafe for KeyedStream<K, V, Loc, Bound, Order, Retry>
impl<K, V, Loc, Bound, Order = TotalOrder, Retry = ExactlyOnce> !Send for KeyedStream<K, V, Loc, Bound, Order, Retry>
impl<K, V, Loc, Bound, Order = TotalOrder, Retry = ExactlyOnce> !Sync for KeyedStream<K, V, Loc, Bound, Order, Retry>
impl<K, V, Loc, Bound, Order, Retry> Unpin for KeyedStream<K, V, Loc, Bound, Order, Retry>
impl<K, V, Loc, Bound, Order = TotalOrder, Retry = ExactlyOnce> !UnwindSafe for KeyedStream<K, V, Loc, Bound, Order, Retry>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more