KeyedStream

Struct KeyedStream 

Source
pub struct KeyedStream<K, V, Loc, Bound: Boundedness, Order = TotalOrder, Retries = ExactlyOnce> { /* private fields */ }
Expand description

Keyed Streams capture streaming elements of type V grouped by a key of type K, where the order of keys is non-deterministic but the order within each group may be deterministic.

Type Parameters:

  • K: the type of the key for each group
  • V: the type of the elements inside each group
  • Loc: the Location where the keyed stream is materialized
  • Bound: tracks whether the entries are Bounded (local and finite) or Unbounded (asynchronous and possibly infinite)
  • Order: tracks whether the elements within each group have deterministic order (TotalOrder) or not (NoOrder)
  • Retries: tracks whether the elements within each group have deterministic cardinality (ExactlyOnce) or may have non-deterministic retries (crate::stream::AtLeastOnce)

Implementations§

Source§

impl<'a, T, L, L2, B: Boundedness, O, R> KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R>

Source

pub fn demux_bincode( self, other: &Cluster<'a, L2>, ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>

Source§

impl<'a, T, L, L2, B: Boundedness, O, R> KeyedStream<MemberId<L2>, T, Cluster<'a, L>, B, O, R>

Source

pub fn demux_bincode( self, other: &Cluster<'a, L2>, ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>

Source§

impl<'a, K, V, L: Location<'a>, B: Boundedness, O, R> KeyedStream<K, V, L, B, O, R>

Source

pub fn assume_ordering<O2>( self, _nondet: NonDet, ) -> KeyedStream<K, V, L, B, O2, R>

Explicitly “casts” the keyed stream to a type with a different ordering guarantee for each group. Useful in unsafe code where the ordering cannot be proven by the type-system.

§Non-Determinism

This function is used as an escape hatch, and any mistakes in the provided ordering guarantee will propagate into the guarantees for the rest of the program.

Source

pub fn assume_retries<R2>( self, nondet: NonDet, ) -> KeyedStream<K, V, L, B, O, R2>

Explicitly “casts” the keyed stream to a type with a different retries guarantee for each group. Useful in unsafe code where the lack of retries cannot be proven by the type-system.

§Non-Determinism

This function is used as an escape hatch, and any mistakes in the provided retries guarantee will propagate into the guarantees for the rest of the program.

Source

pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R>

Flattens the keyed stream into a single stream of key-value pairs, with non-deterministic element ordering.

§Example
process
    .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
    .into_keyed()
    .entries()
// (1, 2), (1, 3), (2, 4) in any order
Source

pub fn values(self) -> Stream<V, L, B, NoOrder, R>

Flattens the keyed stream into a single stream of only the values, with non-deterministic element ordering.

§Example
process
    .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
    .into_keyed()
    .values()
// 2, 3, 4 in any order
Source

pub fn map<U, F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, U, L, B, O, R>
where F: Fn(V) -> U + 'a,

Transforms each value by invoking f on each element, with keys staying the same after transformation. If you need access to the key, see KeyedStream::map_with_key.

If you do not want to modify the stream and instead only want to view each item use KeyedStream::inspect instead.

§Example
process
    .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
    .into_keyed()
    .map(q!(|v| v + 1))
// { 1: [3, 4], 2: [5] }
Source

pub fn map_with_key<U, F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, U, L, B, O, R>
where F: Fn((K, V)) -> U + 'a, K: Clone,

Transforms each value by invoking f on each key-value pair. The resulting values are not re-grouped even they are tuples; instead they will be grouped under the original key.

If you do not want to modify the stream and instead only want to view each item use KeyedStream::inspect_with_key instead.

§Example
process
    .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
    .into_keyed()
    .map_with_key(q!(|(k, v)| k + v))
// { 1: [3, 4], 2: [6] }
Source

pub fn filter<F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, V, L, B, O, R>
where F: Fn(&V) -> bool + 'a,

Creates a stream containing only the elements of each group stream that satisfy a predicate f, preserving the order of the elements within the group.

The closure f receives a reference &V rather than an owned value v because filtering does not modify or take ownership of the values. If you need to modify the values while filtering use KeyedStream::filter_map instead.

§Example
process
    .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
    .into_keyed()
    .filter(q!(|&x| x > 2))
// { 1: [3], 2: [4] }
Source

pub fn filter_with_key<F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, V, L, B, O, R>
where F: Fn(&(K, V)) -> bool + 'a,

Creates a stream containing only the elements of each group stream that satisfy a predicate f (which receives the key-value tuple), preserving the order of the elements within the group.

The closure f receives a reference &(K, V) rather than an owned value (K, V) because filtering does not modify or take ownership of the values. If you need to modify the values while filtering use KeyedStream::filter_map_with_key instead.

§Example
process
    .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
    .into_keyed()
    .filter_with_key(q!(|&(k, v)| v - k == 2))
// { 1: [3], 2: [4] }
Source

pub fn filter_map<U, F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, U, L, B, O, R>
where F: Fn(V) -> Option<U> + 'a,

An operator that both filters and maps each value, with keys staying the same. It yields only the items for which the supplied closure f returns Some(value). If you need access to the key, see KeyedStream::filter_map_with_key.

§Example
process
    .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
    .into_keyed()
    .filter_map(q!(|s| s.parse::<usize>().ok()))
// { 1: [2], 2: [4] }
Source

pub fn filter_map_with_key<U, F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, U, L, B, O, R>
where F: Fn((K, V)) -> Option<U> + 'a, K: Clone,

An operator that both filters and maps each key-value pair. The resulting values are not re-grouped even they are tuples; instead they will be grouped under the original key. It yields only the items for which the supplied closure f returns Some(value).

§Example
process
    .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
    .into_keyed()
    .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
// { 2: [2] }
Source

pub fn inspect<F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, V, L, B, O, R>
where F: Fn(&V) + 'a,

An operator which allows you to “inspect” each element of a stream without modifying it. The closure f is called on a reference to each value. This is mainly useful for debugging, and should not be used to generate side-effects.

§Example
process
    .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
    .into_keyed()
    .inspect(q!(|v| println!("{}", v)))
Source

pub fn inspect_with_key<F>( self, f: impl IntoQuotedMut<'a, F, L>, ) -> KeyedStream<K, V, L, B, O, R>
where F: Fn(&(K, V)) + 'a,

An operator which allows you to “inspect” each element of a stream without modifying it. The closure f is called on a reference to each key-value pair. This is mainly useful for debugging, and should not be used to generate side-effects.

§Example
process
    .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
    .into_keyed()
    .inspect(q!(|v| println!("{}", v)))
Source

pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R>

An operator which allows you to “name” a HydroNode. This is only used for testing, to correlate certain HydroNodes with IDs.

Source§

impl<'a, K, V, L: Location<'a> + NoTick + NoAtomic, O, R> KeyedStream<K, V, L, Unbounded, O, R>

Source

pub fn interleave<O2, R2: MinRetries<R>>( self, other: KeyedStream<K, V, L, Unbounded, O2, R2>, ) -> KeyedStream<K, V, L, Unbounded, NoOrder, R::Min>
where R: MinRetries<R2, Min = R2::Min>,

Produces a new keyed stream that “merges” the inputs by interleaving the elements of any overlapping groups. The result has NoOrder on each group because the order of interleaving is not guaranteed. If the keys across both inputs do not overlap, the ordering will be deterministic and you can safely use Self::assume_ordering.

Currently, both input streams must be Unbounded.

§Example
let numbers1 = process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed();
let numbers2 = process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed();
numbers1.interleave(numbers2)
// { 1: [2, 3], 3: [4, 5] } with each group in unknown order
Source§

impl<'a, K, V, L, B: Boundedness> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
where K: Eq + Hash, L: Location<'a>,

Source

pub fn scan<A, U, I, F>( self, init: impl IntoQuotedMut<'a, I, L> + Copy, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
where K: Clone, I: Fn() -> A + 'a, F: Fn(&mut A, V) -> Option<U> + 'a,

A special case of Stream::scan for keyd streams. For each key group the values are transformed via the f combinator.

Unlike Stream::fold_keyed which only returns the final accumulated value, scan produces a new stream containing all intermediate accumulated values paired with the key. The scan operation can also terminate early by returning None.

The function takes a mutable reference to the accumulator and the current element, and returns an Option<U>. If the function returns Some(value), value is emitted to the output stream. If the function returns None, the stream is terminated and no more elements are processed.

§Example
process
    .source_iter(q!(vec![(0, 1), (0, 2), (1, 3), (1, 4)]))
    .into_keyed()
    .scan(
        q!(|| 0),
        q!(|acc, x| {
            *acc += x;
            Some(*acc)
        }),
    )
// Output: { 0: [1, 3], 1: [3, 7] }
Source

pub fn fold_early_stop<A, I, F>( self, init: impl IntoQuotedMut<'a, I, L> + Copy, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
where K: Clone, I: Fn() -> A + 'a, F: Fn(&mut A, V) -> bool + 'a,

A variant of Stream::fold, intended for keyed streams. The aggregation is executed in-order across the values in each group. But the aggregation function returns a boolean, which when true indicates that the aggregated result is complete and can be released to downstream computation. Unlike Stream::fold_keyed, this means that even if the input stream is crate::Unbounded, the outputs of the fold can be processed like normal stream elements.

§Example
process
    .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
    .into_keyed()
    .fold_early_stop(
        q!(|| 0),
        q!(|acc, x| {
            *acc += x;
            x % 2 == 0
        }),
    )
// Output: { 0: 2, 1: 9 }
Source

pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>( self, init: impl IntoQuotedMut<'a, I, L>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>

Like Stream::fold, aggregates the values in each group via the comb closure.

Each group must have a TotalOrder guarantee, which means that the comb closure is allowed to depend on the order of elements in the group.

If the input and output value types are the same and do not require initialization then use KeyedStream::reduce.

§Example
let tick = process.tick();
let numbers = process
    .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
    .into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
    .fold(q!(|| 0), q!(|acc, x| *acc += x))
    .entries()
    .all_ticks()
// (1, 5), (2, 7)
Source

pub fn reduce<F: Fn(&mut V, V) + 'a>( self, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>

Like Stream::reduce, aggregates the values in each group via the comb closure.

Each group must have a TotalOrder guarantee, which means that the comb closure is allowed to depend on the order of elements in the stream.

If you need the accumulated value to have a different type than the input, use KeyedStream::fold.

§Example
let tick = process.tick();
let numbers = process
    .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
    .into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch.reduce(q!(|acc, x| *acc += x)).entries().all_ticks()
// (1, 5), (2, 7)
Source

pub fn reduce_watermark<O, F>( self, other: impl Into<Optional<O, Tick<L::Root>, Bounded>>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
where O: Clone, F: Fn(&mut V, V) + 'a,

A special case of KeyedStream::reduce where tuples with keys less than the watermark are automatically deleted.

Each group must have a TotalOrder guarantee, which means that the comb closure is allowed to depend on the order of elements in the stream.

§Example
let tick = process.tick();
let watermark = tick.singleton(q!(1));
let numbers = process
    .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
    .into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
    .reduce_watermark(watermark, q!(|acc, x| *acc += x))
    .entries()
    .all_ticks()
// (2, 204)
Source§

impl<'a, K, V, L, B: Boundedness, O> KeyedStream<K, V, L, B, O, ExactlyOnce>
where K: Eq + Hash, L: Location<'a>,

Source

pub fn fold_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>( self, init: impl IntoQuotedMut<'a, I, L>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>

Like Stream::fold_commutative, aggregates the values in each group via the comb closure.

The comb closure must be commutative, as the order of input items is not guaranteed.

If the input and output value types are the same and do not require initialization then use KeyedStream::reduce_commutative.

§Example
let tick = process.tick();
let numbers = process
    .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
    .into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
    .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
    .entries()
    .all_ticks()
// (1, 5), (2, 7)
Source

pub fn reduce_commutative<F: Fn(&mut V, V) + 'a>( self, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>

Like Stream::reduce_commutative, aggregates the values in each group via the comb closure.

The comb closure must be commutative, as the order of input items is not guaranteed.

If you need the accumulated value to have a different type than the input, use KeyedStream::fold_commutative.

§Example
let tick = process.tick();
let numbers = process
    .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
    .into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
    .reduce_commutative(q!(|acc, x| *acc += x))
    .entries()
    .all_ticks()
// (1, 5), (2, 7)
Source

pub fn reduce_watermark_commutative<O2, F>( self, other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
where O2: Clone, F: Fn(&mut V, V) + 'a,

A special case of KeyedStream::reduce_commutative where tuples with keys less than the watermark are automatically deleted.

The comb closure must be commutative, as the order of input items is not guaranteed.

§Example
let tick = process.tick();
let watermark = tick.singleton(q!(1));
let numbers = process
    .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
    .into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
    .reduce_watermark_commutative(watermark, q!(|acc, x| *acc += x))
    .entries()
    .all_ticks()
// (2, 204)
Source§

impl<'a, K, V, L, B: Boundedness, R> KeyedStream<K, V, L, B, TotalOrder, R>
where K: Eq + Hash, L: Location<'a>,

Source

pub fn fold_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>( self, init: impl IntoQuotedMut<'a, I, L>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>

Like Stream::fold_idempotent, aggregates the values in each group via the comb closure.

The comb closure must be idempotent as there may be non-deterministic duplicates.

If the input and output value types are the same and do not require initialization then use KeyedStream::reduce_idempotent.

§Example
let tick = process.tick();
let numbers = process
    .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
    .into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
    .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
    .entries()
    .all_ticks()
// (1, false), (2, true)
Source

pub fn reduce_idempotent<F: Fn(&mut V, V) + 'a>( self, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>

Like Stream::reduce_idempotent, aggregates the values in each group via the comb closure.

The comb closure must be idempotent, as there may be non-deterministic duplicates.

If you need the accumulated value to have a different type than the input, use KeyedStream::fold_idempotent.

§Example
let tick = process.tick();
let numbers = process
    .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
    .into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
    .reduce_idempotent(q!(|acc, x| *acc |= x))
    .entries()
    .all_ticks()
// (1, false), (2, true)
Source

pub fn reduce_watermark_idempotent<O2, F>( self, other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
where O2: Clone, F: Fn(&mut V, V) + 'a,

A special case of KeyedStream::reduce_idempotent where tuples with keys less than the watermark are automatically deleted.

The comb closure must be idempotent, as there may be non-deterministic duplicates.

§Example
let tick = process.tick();
let watermark = tick.singleton(q!(1));
let numbers = process
    .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
    .into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
    .reduce_watermark_idempotent(watermark, q!(|acc, x| *acc |= x))
    .entries()
    .all_ticks()
// (2, true)
Source§

impl<'a, K, V, L, B: Boundedness, O, R> KeyedStream<K, V, L, B, O, R>
where K: Eq + Hash, L: Location<'a>,

Source

pub fn fold_commutative_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>( self, init: impl IntoQuotedMut<'a, I, L>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>

Like Stream::fold_commutative_idempotent, aggregates the values in each group via the comb closure.

The comb closure must be commutative, as the order of input items is not guaranteed, and idempotent, as there may be non-deterministic duplicates.

If the input and output value types are the same and do not require initialization then use KeyedStream::reduce_commutative_idempotent.

§Example
let tick = process.tick();
let numbers = process
    .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
    .into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
    .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
    .entries()
    .all_ticks()
// (1, false), (2, true)
Source

pub fn reduce_commutative_idempotent<F: Fn(&mut V, V) + 'a>( self, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>

Like Stream::reduce_commutative_idempotent, aggregates the values in each group via the comb closure.

The comb closure must be commutative, as the order of input items is not guaranteed, and idempotent, as there may be non-deterministic duplicates.

If you need the accumulated value to have a different type than the input, use KeyedStream::fold_commutative_idempotent.

§Example
let tick = process.tick();
let numbers = process
    .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
    .into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
    .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
    .entries()
    .all_ticks()
// (1, false), (2, true)
Source

pub fn reduce_watermark_commutative_idempotent<O2, F>( self, other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
where O2: Clone, F: Fn(&mut V, V) + 'a,

A special case of Stream::reduce_keyed_commutative_idempotent where tuples with keys less than the watermark are automatically deleted.

The comb closure must be commutative, as the order of input items is not guaranteed, and idempotent, as there may be non-deterministic duplicates.

§Example
let tick = process.tick();
let watermark = tick.singleton(q!(1));
let numbers = process
    .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
    .into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
    .reduce_watermark_commutative_idempotent(watermark, q!(|acc, x| *acc |= x))
    .entries()
    .all_ticks()
// (2, true)
Source

pub fn filter_key_not_in<O2, R2>( self, other: Stream<K, L, Bounded, O2, R2>, ) -> Self

Given a bounded stream of keys K, returns a new keyed stream containing only the groups whose keys are not in the bounded stream.

§Example
let tick = process.tick();
let keyed_stream = process
    .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
    .batch(&tick, nondet!(/** test */))
    .into_keyed();
let keys_to_remove = process
    .source_iter(q!(vec![1, 2]))
    .batch(&tick, nondet!(/** test */));
keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
// { 3: ['c'], 4: ['d'] }
Source§

impl<'a, K, V, L, B: Boundedness, O, R> KeyedStream<K, V, L, B, O, R>
where L: Location<'a> + NoTick + NoAtomic,

Source

pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R>

Source

pub fn batch( self, tick: &Tick<L>, nondet: NonDet, ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R>

Given a tick, returns a keyed stream corresponding to a batch of elements segmented by that tick. These batches are guaranteed to be contiguous across ticks and preserve the order of the input.

§Non-Determinism

The batch boundaries are non-deterministic and may change across executions.

Source§

impl<'a, K, V, L, B: Boundedness, O, R> KeyedStream<K, V, Atomic<L>, B, O, R>
where L: Location<'a> + NoTick + NoAtomic,

Source

pub fn batch(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R>

Returns a keyed stream corresponding to the latest batch of elements being atomically processed. These batches are guaranteed to be contiguous across ticks and preserve the order of the input.

§Non-Determinism

The batch boundaries are non-deterministic and may change across executions.

Source§

impl<'a, K, V, L, O, R> KeyedStream<K, V, L, Bounded, O, R>
where L: Location<'a>,

Source

pub fn chain<O2>( self, other: KeyedStream<K, V, L, Bounded, O2, R>, ) -> KeyedStream<K, V, L, Bounded, O::Min, R>
where O: MinOrder<O2>,

Source§

impl<'a, K, V, L, O, R> KeyedStream<K, V, Tick<L>, Bounded, O, R>
where L: Location<'a>,

Source

pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R>

Trait Implementations§

Source§

impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order, Retries> Clone for KeyedStream<K, V, Loc, Bound, Order, Retries>

Source§

fn clone(&self) -> Self

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<'a, K, V, L, B: Boundedness, O, R> CycleCollection<'a, ForwardRefMarker> for KeyedStream<K, V, L, B, O, R>
where L: Location<'a> + NoTick,

Source§

type Location = L

Source§

fn create_source(ident: Ident, location: L) -> Self

Source§

impl<'a, K, V, L, B: Boundedness, O, R> CycleComplete<'a, ForwardRefMarker> for KeyedStream<K, V, L, B, O, R>
where L: Location<'a> + NoTick,

Source§

fn complete(self, ident: Ident, expected_location: LocationId)

Source§

impl<'a, K, V, L, B: Boundedness, R> From<KeyedStream<K, V, L, B, TotalOrder, R>> for KeyedStream<K, V, L, B, NoOrder, R>
where L: Location<'a>,

Source§

fn from( stream: KeyedStream<K, V, L, B, TotalOrder, R>, ) -> KeyedStream<K, V, L, B, NoOrder, R>

Converts to this type from the input type.

Auto Trait Implementations§

§

impl<K, V, Loc, Bound, Order = TotalOrder, Retries = ExactlyOnce> !Freeze for KeyedStream<K, V, Loc, Bound, Order, Retries>

§

impl<K, V, Loc, Bound, Order = TotalOrder, Retries = ExactlyOnce> !RefUnwindSafe for KeyedStream<K, V, Loc, Bound, Order, Retries>

§

impl<K, V, Loc, Bound, Order = TotalOrder, Retries = ExactlyOnce> !Send for KeyedStream<K, V, Loc, Bound, Order, Retries>

§

impl<K, V, Loc, Bound, Order = TotalOrder, Retries = ExactlyOnce> !Sync for KeyedStream<K, V, Loc, Bound, Order, Retries>

§

impl<K, V, Loc, Bound, Order, Retries> Unpin for KeyedStream<K, V, Loc, Bound, Order, Retries>
where Loc: Unpin, Order: Unpin, Bound: Unpin, Retries: Unpin, K: Unpin, V: Unpin,

§

impl<K, V, Loc, Bound, Order = TotalOrder, Retries = ExactlyOnce> !UnwindSafe for KeyedStream<K, V, Loc, Bound, Order, Retries>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> DynClone for T
where T: Clone,

Source§

fn __clone_box(&self, _: Private) -> *mut ()

Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> MinOrder<T> for T

Source§

type Min = T

The weaker of the two orderings.
Source§

impl<T> MinRetries<T> for T

Source§

type Min = T

The weaker of the two retry guarantees.
§

impl<T> PolicyExt for T
where T: ?Sized,

§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] only if self and other return Action::Follow. Read more
§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

impl<T> ErasedDestructor for T
where T: 'static,