KeyedStream

Struct KeyedStream 

Source
pub struct KeyedStream<K, V, Loc, Bound: Boundedness, Order: Ordering = TotalOrder, Retry: Retries = ExactlyOnce> { /* private fields */ }
Expand description

Streaming elements of type V grouped by a key of type K.

Keyed Streams capture streaming elements of type V grouped by a key of type K, where the order of keys is non-deterministic but the order within each group may be deterministic.

Although keyed streams are conceptually grouped by keys, values are not immediately grouped into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an operator such as KeyedStream::fold is called, which requires K: Hash + Eq.

Type Parameters:

  • K: the type of the key for each group
  • V: the type of the elements inside each group
  • Loc: the Location where the keyed stream is materialized
  • Bound: tracks whether the entries are Bounded (local and finite) or Unbounded (asynchronous and possibly infinite)
  • Order: tracks whether the elements within each group have deterministic order (TotalOrder) or not (NoOrder)
  • Retries: tracks whether the elements within each group have deterministic cardinality (ExactlyOnce) or may have non-deterministic retries (crate::live_collections::stream::AtLeastOnce)

Implementations§

Source§

impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries> KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R>

Source

pub fn demux_bincode( self, other: &Cluster<'a, L2>, ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>

Sends each group of this stream to a specific member of a cluster, with the MemberId key identifying the recipient for each group and using bincode to serialize/deserialize messages.

Each key must be a MemberId<L2> and each value must be a T where the key specifies which cluster member should receive the data. Unlike Stream::broadcast_bincode, this API allows precise targeting of specific cluster members rather than broadcasting to all members.

§Example
let p1 = flow.process::<()>();
let workers: Cluster<()> = flow.cluster::<()>();
let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
let on_worker: Stream<_, Cluster<_>, _> = numbers
    .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)))
    .into_keyed()
    .demux_bincode(&workers);
// if there are 4 members in the cluster, each receives one element
// - MemberId::<()>(0): [0]
// - MemberId::<()>(1): [1]
// - MemberId::<()>(2): [2]
// - MemberId::<()>(3): [3]
Source§

impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries> KeyedStream<MemberId<L2>, T, Cluster<'a, L>, B, O, R>

Source

pub fn demux_bincode( self, other: &Cluster<'a, L2>, ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>

Sends each group of this stream at each source member to a specific member of a destination cluster, with the MemberId key identifying the recipient for each group and using bincode to serialize/deserialize messages.

Each key must be a MemberId<L2> and each value must be a T where the key specifies which cluster member should receive the data. Unlike Stream::broadcast_bincode, this API allows precise targeting of specific cluster members rather than broadcasting to all members.

Each cluster member sends its local stream elements, and they are collected at each destination member as a KeyedStream where keys identify the source cluster member.

§Example
let source: Cluster<Source> = flow.cluster::<Source>();
let to_send: KeyedStream<_, _, Cluster<_>, _> = source
    .source_iter(q!(vec![0, 1, 2, 3]))
    .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)))
    .into_keyed();
let destination: Cluster<Destination> = flow.cluster::<Destination>();
let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
// if there are 4 members in the destination cluster, each receives one message from each source member
// - Destination(0): { Source(0): [0], Source(1): [0], ... }
// - Destination(1): { Source(0): [1], Source(1): [1], ... }
// - ...
Source§

impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>

Source

pub fn assume_ordering<O2: Ordering>( self, _nondet: NonDet, ) -> KeyedStream<K, V, L, B, O2, R>

Explicitly “casts” the keyed stream to a type with a different ordering guarantee for each group. Useful in unsafe code where the ordering cannot be proven by the type-system.

§Non-Determinism

This function is used as an escape hatch, and any mistakes in the provided ordering guarantee will propagate into the guarantees for the rest of the program.

Source

pub fn assume_retries<R2: Retries>( self, nondet: NonDet, ) -> KeyedStream<K, V, L, B, O, R2>

Explicitly “casts” the keyed stream to a type with a different retries guarantee for each group. Useful in unsafe code where the lack of retries cannot be proven by the type-system.

§Non-Determinism

This function is used as an escape hatch, and any mistakes in the provided retries guarantee will propagate into the guarantees for the rest of the program.

Source

pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R>

Flattens the keyed stream into an unordered stream of key-value pairs.

§Example
process
    .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
    .into_keyed()
    .entries()
// (1, 2), (1, 3), (2, 4) in any order
Source

pub fn values(self) -> Stream<V, L, B, NoOrder, R>

Flattens the keyed stream into an unordered stream of only the values.

§Example
process
    .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
    .into_keyed()
    .values()
// 2, 3, 4 in any order
Source

pub fn map<U, F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, U, L, B, O, R>
where F: Fn(V) -> U + 'a,

Transforms each value by invoking f on each element, with keys staying the same after transformation. If you need access to the key, see KeyedStream::map_with_key.

If you do not want to modify the stream and instead only want to view each item use KeyedStream::inspect instead.

§Example
process
    .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
    .into_keyed()
    .map(q!(|v| v + 1))
// { 1: [3, 4], 2: [5] }
Source

pub fn map_with_key<U, F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, U, L, B, O, R>
where F: Fn((K, V)) -> U + 'a, K: Clone,

Transforms each value by invoking f on each key-value pair. The resulting values are not re-grouped even they are tuples; instead they will be grouped under the original key.

If you do not want to modify the stream and instead only want to view each item use KeyedStream::inspect_with_key instead.

§Example
process
    .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
    .into_keyed()
    .map_with_key(q!(|(k, v)| k + v))
// { 1: [3, 4], 2: [6] }
Source

pub fn filter<F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, V, L, B, O, R>
where F: Fn(&V) -> bool + 'a,

Creates a stream containing only the elements of each group stream that satisfy a predicate f, preserving the order of the elements within the group.

The closure f receives a reference &V rather than an owned value v because filtering does not modify or take ownership of the values. If you need to modify the values while filtering use KeyedStream::filter_map instead.

§Example
process
    .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
    .into_keyed()
    .filter(q!(|&x| x > 2))
// { 1: [3], 2: [4] }
Source

pub fn filter_with_key<F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, V, L, B, O, R>
where F: Fn(&(K, V)) -> bool + 'a,

Creates a stream containing only the elements of each group stream that satisfy a predicate f (which receives the key-value tuple), preserving the order of the elements within the group.

The closure f receives a reference &(K, V) rather than an owned value (K, V) because filtering does not modify or take ownership of the values. If you need to modify the values while filtering use KeyedStream::filter_map_with_key instead.

§Example
process
    .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
    .into_keyed()
    .filter_with_key(q!(|&(k, v)| v - k == 2))
// { 1: [3], 2: [4] }
Source

pub fn filter_map<U, F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, U, L, B, O, R>
where F: Fn(V) -> Option<U> + 'a,

An operator that both filters and maps each value, with keys staying the same. It yields only the items for which the supplied closure f returns Some(value). If you need access to the key, see KeyedStream::filter_map_with_key.

§Example
process
    .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
    .into_keyed()
    .filter_map(q!(|s| s.parse::<usize>().ok()))
// { 1: [2], 2: [4] }
Source

pub fn filter_map_with_key<U, F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, U, L, B, O, R>
where F: Fn((K, V)) -> Option<U> + 'a, K: Clone,

An operator that both filters and maps each key-value pair. The resulting values are not re-grouped even they are tuples; instead they will be grouped under the original key. It yields only the items for which the supplied closure f returns Some(value).

§Example
process
    .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
    .into_keyed()
    .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
// { 2: [2] }
Source

pub fn flat_map_ordered<U, I, F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, U, L, B, O, R>
where I: IntoIterator<Item = U>, F: Fn(V) -> I + 'a, K: Clone,

For each value v in each group, transform v using f and then treat the result as an Iterator to produce values one by one within the same group. The implementation for Iterator for the output type I must produce items in a deterministic order.

For example, I could be a Vec, but not a HashSet. If the order of the items in I is not deterministic, use KeyedStream::flat_map_unordered instead.

§Example
process
    .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
    .into_keyed()
    .flat_map_ordered(q!(|x| x))
// { 1: [2, 3, 4], 2: [5, 6] }
Source

pub fn flat_map_unordered<U, I, F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, U, L, B, NoOrder, R>
where I: IntoIterator<Item = U>, F: Fn(V) -> I + 'a, K: Clone,

Like KeyedStream::flat_map_ordered, but allows the implementation of Iterator for the output type I to produce items in any order.

§Example
process
    .source_iter(q!(vec![
        (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
        (2, std::collections::HashSet::from_iter(vec![4, 5]))
    ]))
    .into_keyed()
    .flat_map_unordered(q!(|x| x))
// { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
Source

pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
where V: IntoIterator<Item = U>, K: Clone,

For each value v in each group, treat v as an Iterator and produce its items one by one within the same group. The implementation for Iterator for the value type V must produce items in a deterministic order.

For example, V could be a Vec, but not a HashSet. If the order of the items in V is not deterministic, use KeyedStream::flatten_unordered instead.

§Example
process
    .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
    .into_keyed()
    .flatten_ordered()
// { 1: [2, 3, 4], 2: [5, 6] }
Source

pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
where V: IntoIterator<Item = U>, K: Clone,

Like KeyedStream::flatten_ordered, but allows the implementation of Iterator for the value type V to produce items in any order.

§Example
process
    .source_iter(q!(vec![
        (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
        (2, std::collections::HashSet::from_iter(vec![4, 5]))
    ]))
    .into_keyed()
    .flatten_unordered()
// { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
Source

pub fn inspect<F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, V, L, B, O, R>
where F: Fn(&V) + 'a,

An operator which allows you to “inspect” each element of a stream without modifying it. The closure f is called on a reference to each value. This is mainly useful for debugging, and should not be used to generate side-effects.

§Example
process
    .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
    .into_keyed()
    .inspect(q!(|v| println!("{}", v)))
Source

pub fn inspect_with_key<F>( self, f: impl IntoQuotedMut<'a, F, L>, ) -> KeyedStream<K, V, L, B, O, R>
where F: Fn(&(K, V)) + 'a,

An operator which allows you to “inspect” each element of a stream without modifying it. The closure f is called on a reference to each key-value pair. This is mainly useful for debugging, and should not be used to generate side-effects.

§Example
process
    .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
    .into_keyed()
    .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
Source

pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R>

An operator which allows you to “name” a HydroNode. This is only used for testing, to correlate certain HydroNodes with IDs.

Source§

impl<'a, K, V, L: Location<'a> + NoTick + NoAtomic, O: Ordering, R: Retries> KeyedStream<K, V, L, Unbounded, O, R>

Source

pub fn interleave<O2: Ordering, R2: Retries>( self, other: KeyedStream<K, V, L, Unbounded, O2, R2>, ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
where R: MinRetries<R2>,

Produces a new keyed stream that “merges” the inputs by interleaving the elements of any overlapping groups. The result has NoOrder on each group because the order of interleaving is not guaranteed. If the keys across both inputs do not overlap, the ordering will be deterministic and you can safely use Self::assume_ordering.

Currently, both input streams must be Unbounded.

§Example
let numbers1 = process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed();
let numbers2 = process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed();
numbers1.interleave(numbers2)
// { 1: [2, 3], 3: [4, 5] } with each group in unknown order
Source§

impl<'a, K, V, L, B: Boundedness> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
where K: Eq + Hash, L: Location<'a>,

Source

pub fn scan<A, U, I, F>( self, init: impl IntoQuotedMut<'a, I, L> + Copy, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
where K: Clone, I: Fn() -> A + 'a, F: Fn(&mut A, V) -> Option<U> + 'a,

A special case of Stream::scan for keyed streams. For each key group the values are transformed via the f combinator.

Unlike Stream::fold_keyed which only returns the final accumulated value, scan produces a new stream containing all intermediate accumulated values paired with the key. The scan operation can also terminate early by returning None.

The function takes a mutable reference to the accumulator and the current element, and returns an Option<U>. If the function returns Some(value), value is emitted to the output stream. If the function returns None, the stream is terminated and no more elements are processed.

§Example
process
    .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
    .into_keyed()
    .scan(
        q!(|| 0),
        q!(|acc, x| {
            *acc += x;
            if *acc % 2 == 0 { None } else { Some(*acc) }
        }),
    )
// Output: { 0: [1], 1: [3, 7] }
Source

pub fn generator<A, U, I, F>( self, init: impl IntoQuotedMut<'a, I, L> + Copy, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
where K: Clone, I: Fn() -> A + 'a, F: Fn(&mut A, V) -> Generate<U> + 'a,

Iteratively processes the elements in each group using a state machine that can yield elements as it processes its inputs. This is designed to mirror the unstable generator syntax in Rust, without requiring special syntax.

Like KeyedStream::scan, this function takes in an initializer that emits the initial state for each group. The second argument defines the processing logic, taking in a mutable reference to the group’s state and the value to be processed. It emits a Generate value, whose variants define what is emitted and whether further inputs should be processed.

§Example
process
    .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
    .into_keyed()
    .generator(
        q!(|| 0),
        q!(|acc, x| {
            *acc += x;
            if *acc > 100 {
                hydro_lang::live_collections::keyed_stream::Generate::Return(
                    "done!".to_string()
                )
            } else if *acc % 2 == 0 {
                hydro_lang::live_collections::keyed_stream::Generate::Yield(
                    "even".to_string()
                )
            } else {
                hydro_lang::live_collections::keyed_stream::Generate::Continue
            }
        }),
    )
// Output: { 0: ["even", "done!"], 1: ["even"] }
Source

pub fn fold_early_stop<A, I, F>( self, init: impl IntoQuotedMut<'a, I, L> + Copy, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
where K: Clone, I: Fn() -> A + 'a, F: Fn(&mut A, V) -> bool + 'a,

A variant of Stream::fold, intended for keyed streams. The aggregation is executed in-order across the values in each group. But the aggregation function returns a boolean, which when true indicates that the aggregated result is complete and can be released to downstream computation. Unlike Stream::fold_keyed, this means that even if the input stream is super::boundedness::Unbounded, the outputs of the fold can be processed like normal stream elements.

§Example
process
    .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
    .into_keyed()
    .fold_early_stop(
        q!(|| 0),
        q!(|acc, x| {
            *acc += x;
            x % 2 == 0
        }),
    )
// Output: { 0: 2, 1: 9 }
Source

pub fn first(self) -> KeyedSingleton<K, V, L, B::WhenValueBounded>
where K: Clone,

Gets the first element inside each group of values as a KeyedSingleton that preserves the original group keys. Requires the input stream to have TotalOrder guarantees, otherwise the first element would be non-deterministic.

§Example
process
    .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
    .into_keyed()
    .first()
// Output: { 0: 2, 1: 3 }
Source

pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>( self, init: impl IntoQuotedMut<'a, I, L>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>

Like Stream::fold, aggregates the values in each group via the comb closure.

Each group must have a TotalOrder guarantee, which means that the comb closure is allowed to depend on the order of elements in the group.

If the input and output value types are the same and do not require initialization then use KeyedStream::reduce.

§Example
let tick = process.tick();
let numbers = process
    .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
    .into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
    .fold(q!(|| 0), q!(|acc, x| *acc += x))
    .entries()
    .all_ticks()
// (1, 5), (2, 7)
Source

pub fn reduce<F: Fn(&mut V, V) + 'a>( self, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>

Like Stream::reduce, aggregates the values in each group via the comb closure.

Each group must have a TotalOrder guarantee, which means that the comb closure is allowed to depend on the order of elements in the stream.

If you need the accumulated value to have a different type than the input, use KeyedStream::fold.

§Example
let tick = process.tick();
let numbers = process
    .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
    .into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch.reduce(q!(|acc, x| *acc += x)).entries().all_ticks()
// (1, 5), (2, 7)
Source

pub fn reduce_watermark<O, F>( self, other: impl Into<Optional<O, Tick<L::Root>, Bounded>>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
where O: Clone, F: Fn(&mut V, V) + 'a,

A special case of KeyedStream::reduce where tuples with keys less than the watermark are automatically deleted.

Each group must have a TotalOrder guarantee, which means that the comb closure is allowed to depend on the order of elements in the stream.

§Example
let tick = process.tick();
let watermark = tick.singleton(q!(1));
let numbers = process
    .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
    .into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
    .reduce_watermark(watermark, q!(|acc, x| *acc += x))
    .entries()
    .all_ticks()
// (2, 204)
Source§

impl<'a, K, V, L, B: Boundedness, O: Ordering> KeyedStream<K, V, L, B, O, ExactlyOnce>
where K: Eq + Hash, L: Location<'a>,

Source

pub fn fold_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>( self, init: impl IntoQuotedMut<'a, I, L>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>

Like Stream::fold_commutative, aggregates the values in each group via the comb closure.

The comb closure must be commutative, as the order of input items is not guaranteed.

If the input and output value types are the same and do not require initialization then use KeyedStream::reduce_commutative.

§Example
let tick = process.tick();
let numbers = process
    .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
    .into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
    .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
    .entries()
    .all_ticks()
// (1, 5), (2, 7)
Source

pub fn reduce_commutative<F: Fn(&mut V, V) + 'a>( self, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>

Like Stream::reduce_commutative, aggregates the values in each group via the comb closure.

The comb closure must be commutative, as the order of input items is not guaranteed.

If you need the accumulated value to have a different type than the input, use KeyedStream::fold_commutative.

§Example
let tick = process.tick();
let numbers = process
    .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
    .into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
    .reduce_commutative(q!(|acc, x| *acc += x))
    .entries()
    .all_ticks()
// (1, 5), (2, 7)
Source

pub fn reduce_watermark_commutative<O2, F>( self, other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
where O2: Clone, F: Fn(&mut V, V) + 'a,

A special case of KeyedStream::reduce_commutative where tuples with keys less than the watermark are automatically deleted.

The comb closure must be commutative, as the order of input items is not guaranteed.

§Example
let tick = process.tick();
let watermark = tick.singleton(q!(1));
let numbers = process
    .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
    .into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
    .reduce_watermark_commutative(watermark, q!(|acc, x| *acc += x))
    .entries()
    .all_ticks()
// (2, 204)
Source§

impl<'a, K, V, L, B: Boundedness, R: Retries> KeyedStream<K, V, L, B, TotalOrder, R>
where K: Eq + Hash, L: Location<'a>,

Source

pub fn fold_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>( self, init: impl IntoQuotedMut<'a, I, L>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>

Like Stream::fold_idempotent, aggregates the values in each group via the comb closure.

The comb closure must be idempotent as there may be non-deterministic duplicates.

If the input and output value types are the same and do not require initialization then use KeyedStream::reduce_idempotent.

§Example
let tick = process.tick();
let numbers = process
    .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
    .into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
    .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
    .entries()
    .all_ticks()
// (1, false), (2, true)
Source

pub fn reduce_idempotent<F: Fn(&mut V, V) + 'a>( self, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>

Like Stream::reduce_idempotent, aggregates the values in each group via the comb closure.

The comb closure must be idempotent, as there may be non-deterministic duplicates.

If you need the accumulated value to have a different type than the input, use KeyedStream::fold_idempotent.

§Example
let tick = process.tick();
let numbers = process
    .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
    .into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
    .reduce_idempotent(q!(|acc, x| *acc |= x))
    .entries()
    .all_ticks()
// (1, false), (2, true)
Source

pub fn reduce_watermark_idempotent<O2, F>( self, other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
where O2: Clone, F: Fn(&mut V, V) + 'a,

A special case of KeyedStream::reduce_idempotent where tuples with keys less than the watermark are automatically deleted.

The comb closure must be idempotent, as there may be non-deterministic duplicates.

§Example
let tick = process.tick();
let watermark = tick.singleton(q!(1));
let numbers = process
    .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
    .into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
    .reduce_watermark_idempotent(watermark, q!(|acc, x| *acc |= x))
    .entries()
    .all_ticks()
// (2, true)
Source§

impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
where K: Eq + Hash, L: Location<'a>,

Source

pub fn fold_commutative_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>( self, init: impl IntoQuotedMut<'a, I, L>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>

Like Stream::fold_commutative_idempotent, aggregates the values in each group via the comb closure.

The comb closure must be commutative, as the order of input items is not guaranteed, and idempotent, as there may be non-deterministic duplicates.

If the input and output value types are the same and do not require initialization then use KeyedStream::reduce_commutative_idempotent.

§Example
let tick = process.tick();
let numbers = process
    .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
    .into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
    .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
    .entries()
    .all_ticks()
// (1, false), (2, true)
Source

pub fn reduce_commutative_idempotent<F: Fn(&mut V, V) + 'a>( self, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>

Like Stream::reduce_commutative_idempotent, aggregates the values in each group via the comb closure.

The comb closure must be commutative, as the order of input items is not guaranteed, and idempotent, as there may be non-deterministic duplicates.

If you need the accumulated value to have a different type than the input, use KeyedStream::fold_commutative_idempotent.

§Example
let tick = process.tick();
let numbers = process
    .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
    .into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
    .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
    .entries()
    .all_ticks()
// (1, false), (2, true)
Source

pub fn reduce_watermark_commutative_idempotent<O2, F>( self, other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>, comb: impl IntoQuotedMut<'a, F, L>, ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
where O2: Clone, F: Fn(&mut V, V) + 'a,

A special case of Stream::reduce_keyed_commutative_idempotent where tuples with keys less than the watermark are automatically deleted.

The comb closure must be commutative, as the order of input items is not guaranteed, and idempotent, as there may be non-deterministic duplicates.

§Example
let tick = process.tick();
let watermark = tick.singleton(q!(1));
let numbers = process
    .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
    .into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
    .reduce_watermark_commutative_idempotent(watermark, q!(|acc, x| *acc |= x))
    .entries()
    .all_ticks()
// (2, true)
Source

pub fn filter_key_not_in<O2: Ordering, R2: Retries>( self, other: Stream<K, L, Bounded, O2, R2>, ) -> Self

Given a bounded stream of keys K, returns a new keyed stream containing only the groups whose keys are not in the bounded stream.

§Example
let tick = process.tick();
let keyed_stream = process
    .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
    .batch(&tick, nondet!(/** test */))
    .into_keyed();
let keys_to_remove = process
    .source_iter(q!(vec![1, 2]))
    .batch(&tick, nondet!(/** test */));
keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
// { 3: ['c'], 4: ['d'] }
Source§

impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
where L: Location<'a> + NoTick + NoAtomic,

Source

pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R>

Shifts this keyed stream into an atomic context, which guarantees that any downstream logic will all be executed synchronously before any outputs are yielded (in KeyedStream::end_atomic).

This is useful to enforce local consistency constraints, such as ensuring that a write is processed before an acknowledgement is emitted. Entering an atomic section requires a Tick argument that declares where the stream will be atomically processed. Batching a stream into the same Tick will preserve the synchronous execution, while batching into a different Tick will introduce asynchrony.

Source

pub fn batch( self, tick: &Tick<L>, nondet: NonDet, ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R>

Given a tick, returns a keyed stream corresponding to a batch of elements segmented by that tick. These batches are guaranteed to be contiguous across ticks and preserve the order of the input.

§Non-Determinism

The batch boundaries are non-deterministic and may change across executions.

Source§

impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
where L: Location<'a> + NoTick + NoAtomic,

Source

pub fn batch(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R>

Returns a keyed stream corresponding to the latest batch of elements being atomically processed. These batches are guaranteed to be contiguous across ticks and preserve the order of the input. The output keyed stream will execute in the Tick that was used to create the atomic section.

§Non-Determinism

The batch boundaries are non-deterministic and may change across executions.

Source

pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R>

Yields the elements of this keyed stream back into a top-level, asynchronous execution context. See KeyedStream::atomic for more details.

Source§

impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, L, Bounded, O, R>
where L: Location<'a>,

Source

pub fn chain<O2: Ordering>( self, other: KeyedStream<K, V, L, Bounded, O2, R>, ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, R>
where O: MinOrder<O2>,

Produces a new keyed stream that combines the groups of the inputs by first emitting the elements of the self stream, and then emits the elements of the other stream (if a key is only present in one of the inputs, its values are passed through as-is). The output has a TotalOrder guarantee if and only if both inputs have a TotalOrder guarantee.

Currently, both input streams must be Bounded. This operator will block on the first stream until all its elements are available. In a future version, we will relax the requirement on the other stream.

§Example
let tick = process.tick();
let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
let batch = numbers.batch(&tick, nondet!(/** test */));
batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
// { 0: [2, 1], 1: [4, 3] }
Source§

impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
where L: Location<'a>,

Source

pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R>

Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream, which will stream all the elements across all tick iterations by concatenating the batches for each key.

Source

pub fn all_ticks_atomic(self) -> KeyedStream<K, V, L, Unbounded, O, R>

Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream, which will stream all the elements across all tick iterations by concatenating the batches for each key.

Unlike KeyedStream::all_ticks, this preserves synchronous execution, as the output stream is emitted in an Atomic context that will process elements synchronously with the input stream’s Tick context.

Source

pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R>

Trait Implementations§

Source§

impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries> Clone for KeyedStream<K, V, Loc, Bound, Order, R>

Source§

fn clone(&self) -> Self

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>> for KeyedStream<K, V, L, B, NoOrder, R>
where L: Location<'a>,

Source§

fn from( stream: KeyedStream<K, V, L, B, TotalOrder, R>, ) -> KeyedStream<K, V, L, B, NoOrder, R>

Converts to this type from the input type.

Auto Trait Implementations§

§

impl<K, V, Loc, Bound, Order = TotalOrder, Retry = ExactlyOnce> !Freeze for KeyedStream<K, V, Loc, Bound, Order, Retry>

§

impl<K, V, Loc, Bound, Order = TotalOrder, Retry = ExactlyOnce> !RefUnwindSafe for KeyedStream<K, V, Loc, Bound, Order, Retry>

§

impl<K, V, Loc, Bound, Order = TotalOrder, Retry = ExactlyOnce> !Send for KeyedStream<K, V, Loc, Bound, Order, Retry>

§

impl<K, V, Loc, Bound, Order = TotalOrder, Retry = ExactlyOnce> !Sync for KeyedStream<K, V, Loc, Bound, Order, Retry>

§

impl<K, V, Loc, Bound, Order, Retry> Unpin for KeyedStream<K, V, Loc, Bound, Order, Retry>
where Loc: Unpin, Order: Unpin, Bound: Unpin, Retry: Unpin, K: Unpin, V: Unpin,

§

impl<K, V, Loc, Bound, Order = TotalOrder, Retry = ExactlyOnce> !UnwindSafe for KeyedStream<K, V, Loc, Bound, Order, Retry>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> DynClone for T
where T: Clone,

Source§

fn __clone_box(&self, _: Private) -> *mut ()

Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
§

impl<T> PolicyExt for T
where T: ?Sized,

§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] only if self and other return Action::Follow. Read more
§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

impl<T> ErasedDestructor for T
where T: 'static,