pub struct KeyedStream<K, V, Loc, Bound: Boundedness, Order = TotalOrder, Retries = ExactlyOnce> { /* private fields */ }
Expand description
Keyed Streams capture streaming elements of type V
grouped by a key of type K
,
where the order of keys is non-deterministic but the order within each group may
be deterministic.
Type Parameters:
K
: the type of the key for each groupV
: the type of the elements inside each groupLoc
: theLocation
where the keyed stream is materializedBound
: tracks whether the entries areBounded
(local and finite) orUnbounded
(asynchronous and possibly infinite)Order
: tracks whether the elements within each group have deterministic order (TotalOrder
) or not (NoOrder
)Retries
: tracks whether the elements within each group have deterministic cardinality (ExactlyOnce
) or may have non-deterministic retries (crate::stream::AtLeastOnce
)
Implementations§
Source§impl<'a, T, L, L2, B: Boundedness, O, R> KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R>
impl<'a, T, L, L2, B: Boundedness, O, R> KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R>
pub fn demux_bincode(
self,
other: &Cluster<'a, L2>,
) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>where
T: Serialize + DeserializeOwned,
Source§impl<'a, T, L, L2, B: Boundedness, O, R> KeyedStream<MemberId<L2>, T, Cluster<'a, L>, B, O, R>
impl<'a, T, L, L2, B: Boundedness, O, R> KeyedStream<MemberId<L2>, T, Cluster<'a, L>, B, O, R>
pub fn demux_bincode(
self,
other: &Cluster<'a, L2>,
) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>where
T: Serialize + DeserializeOwned,
Source§impl<'a, K, V, L: Location<'a>, B: Boundedness, O, R> KeyedStream<K, V, L, B, O, R>
impl<'a, K, V, L: Location<'a>, B: Boundedness, O, R> KeyedStream<K, V, L, B, O, R>
Sourcepub fn assume_ordering<O2>(
self,
_nondet: NonDet,
) -> KeyedStream<K, V, L, B, O2, R>
pub fn assume_ordering<O2>( self, _nondet: NonDet, ) -> KeyedStream<K, V, L, B, O2, R>
Explicitly “casts” the keyed stream to a type with a different ordering guarantee for each group. Useful in unsafe code where the ordering cannot be proven by the type-system.
§Non-Determinism
This function is used as an escape hatch, and any mistakes in the provided ordering guarantee will propagate into the guarantees for the rest of the program.
Sourcepub fn assume_retries<R2>(
self,
nondet: NonDet,
) -> KeyedStream<K, V, L, B, O, R2>
pub fn assume_retries<R2>( self, nondet: NonDet, ) -> KeyedStream<K, V, L, B, O, R2>
Explicitly “casts” the keyed stream to a type with a different retries guarantee for each group. Useful in unsafe code where the lack of retries cannot be proven by the type-system.
§Non-Determinism
This function is used as an escape hatch, and any mistakes in the provided retries guarantee will propagate into the guarantees for the rest of the program.
Sourcepub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R>
pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R>
Flattens the keyed stream into a single stream of key-value pairs, with non-deterministic element ordering.
§Example
process
.source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
.into_keyed()
.entries()
// (1, 2), (1, 3), (2, 4) in any order
Sourcepub fn values(self) -> Stream<V, L, B, NoOrder, R>
pub fn values(self) -> Stream<V, L, B, NoOrder, R>
Flattens the keyed stream into a single stream of only the values, with non-deterministic element ordering.
§Example
process
.source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
.into_keyed()
.values()
// 2, 3, 4 in any order
Sourcepub fn map<U, F>(
self,
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedStream<K, U, L, B, O, R>where
F: Fn(V) -> U + 'a,
pub fn map<U, F>(
self,
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedStream<K, U, L, B, O, R>where
F: Fn(V) -> U + 'a,
Transforms each value by invoking f
on each element, with keys staying the same
after transformation. If you need access to the key, see KeyedStream::map_with_key
.
If you do not want to modify the stream and instead only want to view
each item use KeyedStream::inspect
instead.
§Example
process
.source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
.into_keyed()
.map(q!(|v| v + 1))
// { 1: [3, 4], 2: [5] }
Sourcepub fn map_with_key<U, F>(
self,
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedStream<K, U, L, B, O, R>
pub fn map_with_key<U, F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, U, L, B, O, R>
Transforms each value by invoking f
on each key-value pair. The resulting values are not
re-grouped even they are tuples; instead they will be grouped under the original key.
If you do not want to modify the stream and instead only want to view
each item use KeyedStream::inspect_with_key
instead.
§Example
process
.source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
.into_keyed()
.map_with_key(q!(|(k, v)| k + v))
// { 1: [3, 4], 2: [6] }
Sourcepub fn filter<F>(
self,
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedStream<K, V, L, B, O, R>
pub fn filter<F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, V, L, B, O, R>
Creates a stream containing only the elements of each group stream that satisfy a predicate
f
, preserving the order of the elements within the group.
The closure f
receives a reference &V
rather than an owned value v
because filtering does
not modify or take ownership of the values. If you need to modify the values while filtering
use KeyedStream::filter_map
instead.
§Example
process
.source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
.into_keyed()
.filter(q!(|&x| x > 2))
// { 1: [3], 2: [4] }
Sourcepub fn filter_with_key<F>(
self,
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedStream<K, V, L, B, O, R>
pub fn filter_with_key<F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, V, L, B, O, R>
Creates a stream containing only the elements of each group stream that satisfy a predicate
f
(which receives the key-value tuple), preserving the order of the elements within the group.
The closure f
receives a reference &(K, V)
rather than an owned value (K, V)
because filtering does
not modify or take ownership of the values. If you need to modify the values while filtering
use KeyedStream::filter_map_with_key
instead.
§Example
process
.source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
.into_keyed()
.filter_with_key(q!(|&(k, v)| v - k == 2))
// { 1: [3], 2: [4] }
Sourcepub fn filter_map<U, F>(
self,
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedStream<K, U, L, B, O, R>
pub fn filter_map<U, F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, U, L, B, O, R>
An operator that both filters and maps each value, with keys staying the same.
It yields only the items for which the supplied closure f
returns Some(value)
.
If you need access to the key, see KeyedStream::filter_map_with_key
.
§Example
process
.source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
.into_keyed()
.filter_map(q!(|s| s.parse::<usize>().ok()))
// { 1: [2], 2: [4] }
Sourcepub fn filter_map_with_key<U, F>(
self,
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedStream<K, U, L, B, O, R>
pub fn filter_map_with_key<U, F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, U, L, B, O, R>
An operator that both filters and maps each key-value pair. The resulting values are not
re-grouped even they are tuples; instead they will be grouped under the original key.
It yields only the items for which the supplied closure f
returns Some(value)
.
§Example
process
.source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
.into_keyed()
.filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
// { 2: [2] }
Sourcepub fn inspect<F>(
self,
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedStream<K, V, L, B, O, R>
pub fn inspect<F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, V, L, B, O, R>
An operator which allows you to “inspect” each element of a stream without
modifying it. The closure f
is called on a reference to each value. This is
mainly useful for debugging, and should not be used to generate side-effects.
§Example
process
.source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
.into_keyed()
.inspect(q!(|v| println!("{}", v)))
Sourcepub fn inspect_with_key<F>(
self,
f: impl IntoQuotedMut<'a, F, L>,
) -> KeyedStream<K, V, L, B, O, R>
pub fn inspect_with_key<F>( self, f: impl IntoQuotedMut<'a, F, L>, ) -> KeyedStream<K, V, L, B, O, R>
An operator which allows you to “inspect” each element of a stream without
modifying it. The closure f
is called on a reference to each key-value pair. This is
mainly useful for debugging, and should not be used to generate side-effects.
§Example
process
.source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
.into_keyed()
.inspect(q!(|v| println!("{}", v)))
Sourcepub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R>
pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R>
An operator which allows you to “name” a HydroNode
.
This is only used for testing, to correlate certain HydroNode
s with IDs.
Source§impl<'a, K, V, L: Location<'a> + NoTick + NoAtomic, O, R> KeyedStream<K, V, L, Unbounded, O, R>
impl<'a, K, V, L: Location<'a> + NoTick + NoAtomic, O, R> KeyedStream<K, V, L, Unbounded, O, R>
Sourcepub fn interleave<O2, R2: MinRetries<R>>(
self,
other: KeyedStream<K, V, L, Unbounded, O2, R2>,
) -> KeyedStream<K, V, L, Unbounded, NoOrder, R::Min>where
R: MinRetries<R2, Min = R2::Min>,
pub fn interleave<O2, R2: MinRetries<R>>(
self,
other: KeyedStream<K, V, L, Unbounded, O2, R2>,
) -> KeyedStream<K, V, L, Unbounded, NoOrder, R::Min>where
R: MinRetries<R2, Min = R2::Min>,
Produces a new keyed stream that “merges” the inputs by interleaving the elements
of any overlapping groups. The result has NoOrder
on each group because the
order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
the ordering will be deterministic and you can safely use Self::assume_ordering
.
Currently, both input streams must be Unbounded
.
§Example
let numbers1 = process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed();
let numbers2 = process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed();
numbers1.interleave(numbers2)
// { 1: [2, 3], 3: [4, 5] } with each group in unknown order
Source§impl<'a, K, V, L, B: Boundedness> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
impl<'a, K, V, L, B: Boundedness> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
Sourcepub fn scan<A, U, I, F>(
self,
init: impl IntoQuotedMut<'a, I, L> + Copy,
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
pub fn scan<A, U, I, F>( self, init: impl IntoQuotedMut<'a, I, L> + Copy, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
A special case of Stream::scan
for keyd streams. For each key group the values are transformed via the f
combinator.
Unlike Stream::fold_keyed
which only returns the final accumulated value, scan
produces a new stream
containing all intermediate accumulated values paired with the key. The scan operation can also terminate
early by returning None
.
The function takes a mutable reference to the accumulator and the current element, and returns
an Option<U>
. If the function returns Some(value)
, value
is emitted to the output stream.
If the function returns None
, the stream is terminated and no more elements are processed.
§Example
process
.source_iter(q!(vec![(0, 1), (0, 2), (1, 3), (1, 4)]))
.into_keyed()
.scan(
q!(|| 0),
q!(|acc, x| {
*acc += x;
Some(*acc)
}),
)
// Output: { 0: [1, 3], 1: [3, 7] }
Sourcepub fn fold_early_stop<A, I, F>(
self,
init: impl IntoQuotedMut<'a, I, L> + Copy,
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
pub fn fold_early_stop<A, I, F>( self, init: impl IntoQuotedMut<'a, I, L> + Copy, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
A variant of Stream::fold
, intended for keyed streams. The aggregation is executed in-order across the values
in each group. But the aggregation function returns a boolean, which when true indicates that the aggregated
result is complete and can be released to downstream computation. Unlike Stream::fold_keyed
, this means that
even if the input stream is crate::Unbounded
, the outputs of the fold can be processed like normal stream elements.
§Example
process
.source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
.into_keyed()
.fold_early_stop(
q!(|| 0),
q!(|acc, x| {
*acc += x;
x % 2 == 0
}),
)
// Output: { 0: 2, 1: 9 }
Sourcepub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
self,
init: impl IntoQuotedMut<'a, I, L>,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>( self, init: impl IntoQuotedMut<'a, I, L>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
Like Stream::fold
, aggregates the values in each group via the comb
closure.
Each group must have a TotalOrder
guarantee, which means that the comb
closure is allowed
to depend on the order of elements in the group.
If the input and output value types are the same and do not require initialization then use
KeyedStream::reduce
.
§Example
let tick = process.tick();
let numbers = process
.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
.into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.fold(q!(|| 0), q!(|acc, x| *acc += x))
.entries()
.all_ticks()
// (1, 5), (2, 7)
Sourcepub fn reduce<F: Fn(&mut V, V) + 'a>(
self,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
pub fn reduce<F: Fn(&mut V, V) + 'a>( self, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
Like Stream::reduce
, aggregates the values in each group via the comb
closure.
Each group must have a TotalOrder
guarantee, which means that the comb
closure is allowed
to depend on the order of elements in the stream.
If you need the accumulated value to have a different type than the input, use KeyedStream::fold
.
§Example
let tick = process.tick();
let numbers = process
.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
.into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch.reduce(q!(|acc, x| *acc += x)).entries().all_ticks()
// (1, 5), (2, 7)
Sourcepub fn reduce_watermark<O, F>(
self,
other: impl Into<Optional<O, Tick<L::Root>, Bounded>>,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
pub fn reduce_watermark<O, F>( self, other: impl Into<Optional<O, Tick<L::Root>, Bounded>>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
A special case of KeyedStream::reduce
where tuples with keys less than the watermark are automatically deleted.
Each group must have a TotalOrder
guarantee, which means that the comb
closure is allowed
to depend on the order of elements in the stream.
§Example
let tick = process.tick();
let watermark = tick.singleton(q!(1));
let numbers = process
.source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
.into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.reduce_watermark(watermark, q!(|acc, x| *acc += x))
.entries()
.all_ticks()
// (2, 204)
Source§impl<'a, K, V, L, B: Boundedness, O> KeyedStream<K, V, L, B, O, ExactlyOnce>
impl<'a, K, V, L, B: Boundedness, O> KeyedStream<K, V, L, B, O, ExactlyOnce>
Sourcepub fn fold_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
self,
init: impl IntoQuotedMut<'a, I, L>,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
pub fn fold_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>( self, init: impl IntoQuotedMut<'a, I, L>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
Like Stream::fold_commutative
, aggregates the values in each group via the comb
closure.
The comb
closure must be commutative, as the order of input items is not guaranteed.
If the input and output value types are the same and do not require initialization then use
KeyedStream::reduce_commutative
.
§Example
let tick = process.tick();
let numbers = process
.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
.into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
.entries()
.all_ticks()
// (1, 5), (2, 7)
Sourcepub fn reduce_commutative<F: Fn(&mut V, V) + 'a>(
self,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
pub fn reduce_commutative<F: Fn(&mut V, V) + 'a>( self, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
Like Stream::reduce_commutative
, aggregates the values in each group via the comb
closure.
The comb
closure must be commutative, as the order of input items is not guaranteed.
If you need the accumulated value to have a different type than the input, use KeyedStream::fold_commutative
.
§Example
let tick = process.tick();
let numbers = process
.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
.into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.reduce_commutative(q!(|acc, x| *acc += x))
.entries()
.all_ticks()
// (1, 5), (2, 7)
Sourcepub fn reduce_watermark_commutative<O2, F>(
self,
other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
pub fn reduce_watermark_commutative<O2, F>( self, other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
A special case of KeyedStream::reduce_commutative
where tuples with keys less than the watermark are automatically deleted.
The comb
closure must be commutative, as the order of input items is not guaranteed.
§Example
let tick = process.tick();
let watermark = tick.singleton(q!(1));
let numbers = process
.source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
.into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.reduce_watermark_commutative(watermark, q!(|acc, x| *acc += x))
.entries()
.all_ticks()
// (2, 204)
Source§impl<'a, K, V, L, B: Boundedness, R> KeyedStream<K, V, L, B, TotalOrder, R>
impl<'a, K, V, L, B: Boundedness, R> KeyedStream<K, V, L, B, TotalOrder, R>
Sourcepub fn fold_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
self,
init: impl IntoQuotedMut<'a, I, L>,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
pub fn fold_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>( self, init: impl IntoQuotedMut<'a, I, L>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
Like Stream::fold_idempotent
, aggregates the values in each group via the comb
closure.
The comb
closure must be idempotent as there may be non-deterministic duplicates.
If the input and output value types are the same and do not require initialization then use
KeyedStream::reduce_idempotent
.
§Example
let tick = process.tick();
let numbers = process
.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
.into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
.entries()
.all_ticks()
// (1, false), (2, true)
Sourcepub fn reduce_idempotent<F: Fn(&mut V, V) + 'a>(
self,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
pub fn reduce_idempotent<F: Fn(&mut V, V) + 'a>( self, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
Like Stream::reduce_idempotent
, aggregates the values in each group via the comb
closure.
The comb
closure must be idempotent, as there may be non-deterministic duplicates.
If you need the accumulated value to have a different type than the input, use KeyedStream::fold_idempotent
.
§Example
let tick = process.tick();
let numbers = process
.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
.into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.reduce_idempotent(q!(|acc, x| *acc |= x))
.entries()
.all_ticks()
// (1, false), (2, true)
Sourcepub fn reduce_watermark_idempotent<O2, F>(
self,
other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
pub fn reduce_watermark_idempotent<O2, F>( self, other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
A special case of KeyedStream::reduce_idempotent
where tuples with keys less than the watermark are automatically deleted.
The comb
closure must be idempotent, as there may be non-deterministic duplicates.
§Example
let tick = process.tick();
let watermark = tick.singleton(q!(1));
let numbers = process
.source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
.into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.reduce_watermark_idempotent(watermark, q!(|acc, x| *acc |= x))
.entries()
.all_ticks()
// (2, true)
Source§impl<'a, K, V, L, B: Boundedness, O, R> KeyedStream<K, V, L, B, O, R>
impl<'a, K, V, L, B: Boundedness, O, R> KeyedStream<K, V, L, B, O, R>
Sourcepub fn fold_commutative_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
self,
init: impl IntoQuotedMut<'a, I, L>,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
pub fn fold_commutative_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>( self, init: impl IntoQuotedMut<'a, I, L>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
Like Stream::fold_commutative_idempotent
, aggregates the values in each group via the comb
closure.
The comb
closure must be commutative, as the order of input items is not guaranteed, and idempotent,
as there may be non-deterministic duplicates.
If the input and output value types are the same and do not require initialization then use
KeyedStream::reduce_commutative_idempotent
.
§Example
let tick = process.tick();
let numbers = process
.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
.into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
.entries()
.all_ticks()
// (1, false), (2, true)
Sourcepub fn reduce_commutative_idempotent<F: Fn(&mut V, V) + 'a>(
self,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
pub fn reduce_commutative_idempotent<F: Fn(&mut V, V) + 'a>( self, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
Like Stream::reduce_commutative_idempotent
, aggregates the values in each group via the comb
closure.
The comb
closure must be commutative, as the order of input items is not guaranteed, and idempotent,
as there may be non-deterministic duplicates.
If you need the accumulated value to have a different type than the input, use KeyedStream::fold_commutative_idempotent
.
§Example
let tick = process.tick();
let numbers = process
.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
.into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
.entries()
.all_ticks()
// (1, false), (2, true)
Sourcepub fn reduce_watermark_commutative_idempotent<O2, F>(
self,
other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
pub fn reduce_watermark_commutative_idempotent<O2, F>( self, other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
A special case of Stream::reduce_keyed_commutative_idempotent
where tuples with keys less than the watermark are automatically deleted.
The comb
closure must be commutative, as the order of input items is not guaranteed, and idempotent,
as there may be non-deterministic duplicates.
§Example
let tick = process.tick();
let watermark = tick.singleton(q!(1));
let numbers = process
.source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
.into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.reduce_watermark_commutative_idempotent(watermark, q!(|acc, x| *acc |= x))
.entries()
.all_ticks()
// (2, true)
Sourcepub fn filter_key_not_in<O2, R2>(
self,
other: Stream<K, L, Bounded, O2, R2>,
) -> Self
pub fn filter_key_not_in<O2, R2>( self, other: Stream<K, L, Bounded, O2, R2>, ) -> Self
Given a bounded stream of keys K
, returns a new keyed stream containing only the groups
whose keys are not in the bounded stream.
§Example
let tick = process.tick();
let keyed_stream = process
.source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
.batch(&tick, nondet!(/** test */))
.into_keyed();
let keys_to_remove = process
.source_iter(q!(vec![1, 2]))
.batch(&tick, nondet!(/** test */));
keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
// { 3: ['c'], 4: ['d'] }
Source§impl<'a, K, V, L, B: Boundedness, O, R> KeyedStream<K, V, L, B, O, R>
impl<'a, K, V, L, B: Boundedness, O, R> KeyedStream<K, V, L, B, O, R>
pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R>
Sourcepub fn batch(
self,
tick: &Tick<L>,
nondet: NonDet,
) -> KeyedStream<K, V, Tick<L>, Bounded, O, R>
pub fn batch( self, tick: &Tick<L>, nondet: NonDet, ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R>
Given a tick, returns a keyed stream corresponding to a batch of elements segmented by that tick. These batches are guaranteed to be contiguous across ticks and preserve the order of the input.
§Non-Determinism
The batch boundaries are non-deterministic and may change across executions.
Source§impl<'a, K, V, L, B: Boundedness, O, R> KeyedStream<K, V, Atomic<L>, B, O, R>
impl<'a, K, V, L, B: Boundedness, O, R> KeyedStream<K, V, Atomic<L>, B, O, R>
Sourcepub fn batch(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R>
pub fn batch(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R>
Returns a keyed stream corresponding to the latest batch of elements being atomically processed. These batches are guaranteed to be contiguous across ticks and preserve the order of the input.
§Non-Determinism
The batch boundaries are non-deterministic and may change across executions.
Source§impl<'a, K, V, L, O, R> KeyedStream<K, V, L, Bounded, O, R>where
L: Location<'a>,
impl<'a, K, V, L, O, R> KeyedStream<K, V, L, Bounded, O, R>where
L: Location<'a>,
pub fn chain<O2>(
self,
other: KeyedStream<K, V, L, Bounded, O2, R>,
) -> KeyedStream<K, V, L, Bounded, O::Min, R>where
O: MinOrder<O2>,
Source§impl<'a, K, V, L, O, R> KeyedStream<K, V, Tick<L>, Bounded, O, R>where
L: Location<'a>,
impl<'a, K, V, L, O, R> KeyedStream<K, V, Tick<L>, Bounded, O, R>where
L: Location<'a>,
pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R>
Trait Implementations§
Source§impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order, Retries> Clone for KeyedStream<K, V, Loc, Bound, Order, Retries>
impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order, Retries> Clone for KeyedStream<K, V, Loc, Bound, Order, Retries>
Source§impl<'a, K, V, L, B: Boundedness, O, R> CycleCollection<'a, ForwardRefMarker> for KeyedStream<K, V, L, B, O, R>
impl<'a, K, V, L, B: Boundedness, O, R> CycleCollection<'a, ForwardRefMarker> for KeyedStream<K, V, L, B, O, R>
Source§impl<'a, K, V, L, B: Boundedness, O, R> CycleComplete<'a, ForwardRefMarker> for KeyedStream<K, V, L, B, O, R>
impl<'a, K, V, L, B: Boundedness, O, R> CycleComplete<'a, ForwardRefMarker> for KeyedStream<K, V, L, B, O, R>
fn complete(self, ident: Ident, expected_location: LocationId)
Source§impl<'a, K, V, L, B: Boundedness, R> From<KeyedStream<K, V, L, B, TotalOrder, R>> for KeyedStream<K, V, L, B, NoOrder, R>where
L: Location<'a>,
impl<'a, K, V, L, B: Boundedness, R> From<KeyedStream<K, V, L, B, TotalOrder, R>> for KeyedStream<K, V, L, B, NoOrder, R>where
L: Location<'a>,
Source§fn from(
stream: KeyedStream<K, V, L, B, TotalOrder, R>,
) -> KeyedStream<K, V, L, B, NoOrder, R>
fn from( stream: KeyedStream<K, V, L, B, TotalOrder, R>, ) -> KeyedStream<K, V, L, B, NoOrder, R>
Auto Trait Implementations§
impl<K, V, Loc, Bound, Order = TotalOrder, Retries = ExactlyOnce> !Freeze for KeyedStream<K, V, Loc, Bound, Order, Retries>
impl<K, V, Loc, Bound, Order = TotalOrder, Retries = ExactlyOnce> !RefUnwindSafe for KeyedStream<K, V, Loc, Bound, Order, Retries>
impl<K, V, Loc, Bound, Order = TotalOrder, Retries = ExactlyOnce> !Send for KeyedStream<K, V, Loc, Bound, Order, Retries>
impl<K, V, Loc, Bound, Order = TotalOrder, Retries = ExactlyOnce> !Sync for KeyedStream<K, V, Loc, Bound, Order, Retries>
impl<K, V, Loc, Bound, Order, Retries> Unpin for KeyedStream<K, V, Loc, Bound, Order, Retries>
impl<K, V, Loc, Bound, Order = TotalOrder, Retries = ExactlyOnce> !UnwindSafe for KeyedStream<K, V, Loc, Bound, Order, Retries>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more