pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> { /* private fields */ }Expand description
Mapping from keys of type K to values of type V.
Keyed Singletons capture an asynchronously updated mapping from keys of the K to values of
type V, where the order of keys is non-deterministic. In addition to the standard boundedness
variants (Bounded for finite and immutable, Unbounded for asynchronously changing),
keyed singletons can use BoundedValue to declare that new keys may be added over time, but
keys cannot be removed and the value for each key is immutable.
Type Parameters:
K: the type of the key for each entryV: the type of the value for each entryLoc: theLocationwhere the keyed singleton is materializedBound: tracks whether the entries are:Bounded(local and finite)Unbounded(asynchronous with entries added / removed / changed over time)BoundedValue(asynchronous with immutable values for each key and no removals)
Implementations§
Source§impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
Source§impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
Sourcepub fn map<U, F>(
self,
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedSingleton<K, U, L, B>where
F: Fn(V) -> U + 'a,
pub fn map<U, F>(
self,
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedSingleton<K, U, L, B>where
F: Fn(V) -> U + 'a,
Transforms each value by invoking f on each element, with keys staying the same
after transformation. If you need access to the key, see KeyedSingleton::map_with_key.
If you do not want to modify the stream and instead only want to view
each item use KeyedSingleton::inspect instead.
§Example
let keyed_singleton = // { 1: 2, 2: 4 }
keyed_singleton.map(q!(|v| v + 1))
// { 1: 3, 2: 5 }Sourcepub fn map_with_key<U, F>(
self,
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedSingleton<K, U, L, B>
pub fn map_with_key<U, F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedSingleton<K, U, L, B>
Transforms each value by invoking f on each key-value pair, with keys staying the same
after transformation. Unlike KeyedSingleton::map, this gives access to both the key and value.
The closure f receives a tuple (K, V) containing both the key and value, and returns
the new value U. The key remains unchanged in the output.
§Example
let keyed_singleton = // { 1: 2, 2: 4 }
keyed_singleton.map_with_key(q!(|(k, v)| k + v))
// { 1: 3, 2: 6 }Sourcepub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound>
pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound>
Gets the number of keys in the keyed singleton.
The output singleton will be unbounded if the input is Unbounded or BoundedValue,
since keys may be added / removed over time. When the set of keys changes, the count will
be asynchronously updated.
§Example
let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
keyed_singleton.key_count()
// 3Sourcepub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
pub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
Converts this keyed singleton into a Singleton containing a HashMap from keys to values.
As the values for each key are updated asynchronously, the HashMap will be updated
asynchronously as well.
§Example
let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
keyed_singleton.into_singleton()
// { 1: "a", 2: "b", 3: "c" }Sourcepub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B>
pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B>
An operator which allows you to “name” a HydroNode.
This is only used for testing, to correlate certain HydroNodes with IDs.
Source§impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
Sourcepub fn entries(
self,
) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce>
pub fn entries( self, ) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce>
Flattens the keyed singleton into an unordered stream of key-value pairs.
The value for each key must be bounded, otherwise the resulting stream elements would be non-determinstic. As new entries are added to the keyed singleton, they will be streamed into the output.
§Example
let keyed_singleton = // { 1: 2, 2: 4 }
keyed_singleton.entries()
// (1, 2), (2, 4) in any orderSourcepub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce>
pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce>
Flattens the keyed singleton into an unordered stream of just the values.
The value for each key must be bounded, otherwise the resulting stream elements would be non-determinstic. As new entries are added to the keyed singleton, they will be streamed into the output.
§Example
let keyed_singleton = // { 1: 2, 2: 4 }
keyed_singleton.values()
// 2, 4 in any orderSourcepub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce>
pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce>
Flattens the keyed singleton into an unordered stream of just the keys.
The value for each key must be bounded, otherwise the removal of keys would result in non-determinism. As new entries are added to the keyed singleton, they will be streamed into the output.
§Example
let keyed_singleton = // { 1: 2, 2: 4 }
keyed_singleton.keys()
// 1, 2 in any orderSourcepub fn filter_key_not_in<O2: Ordering, R2: Retries>(
self,
other: Stream<K, L, Bounded, O2, R2>,
) -> Self
pub fn filter_key_not_in<O2: Ordering, R2: Retries>( self, other: Stream<K, L, Bounded, O2, R2>, ) -> Self
Given a bounded stream of keys K, returns a new keyed singleton containing only the
entries whose keys are not in the provided stream.
§Example
let tick = process.tick();
let keyed_singleton = // { 1: 2, 2: 4 }
let keys_to_remove = process
.source_iter(q!(vec![1]))
.batch(&tick, nondet!(/** test */));
keyed_singleton.filter_key_not_in(keys_to_remove)
// { 2: 4 }Sourcepub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
An operator which allows you to “inspect” each value of a keyed singleton without
modifying it. The closure f is called on a reference to each value. This is
mainly useful for debugging, and should not be used to generate side-effects.
§Example
let keyed_singleton = // { 1: 2, 2: 4 }
keyed_singleton
.inspect(q!(|v| println!("{}", v)))
// { 1: 2, 2: 4 }Sourcepub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
An operator which allows you to “inspect” each entry of a keyed singleton without
modifying it. The closure f is called on a reference to each key-value pair. This is
mainly useful for debugging, and should not be used to generate side-effects.
§Example
let keyed_singleton = // { 1: 2, 2: 4 }
keyed_singleton
.inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
// { 1: 2, 2: 4 }Sourcepub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>where
K: Ord,
pub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>where
K: Ord,
Gets the key-value tuple with the largest key among all entries in this KeyedSingleton.
Because this method requires values to be bounded, the output Optional will only be
asynchronously updated if a new key is added that is higher than the previous max key.
§Example
let tick = process.tick();
let keyed_singleton = // { 1: 123, 2: 456, 0: 789 }
keyed_singleton.get_max_key()
// (2, 456)Sourcepub fn into_keyed_stream(
self,
) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce>
pub fn into_keyed_stream( self, ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce>
Converts this keyed singleton into a KeyedStream with each group having a single
element, the value.
This is the equivalent of Singleton::into_stream but keyed.
§Example
let keyed_singleton = // { 1: 2, 2: 4 }
keyed_singleton
.clone()
.into_keyed_stream()
.interleave(
keyed_singleton.into_keyed_stream()
)
/// // { 1: [2, 2], 2: [4, 4] }Source§impl<'a, K: Hash + Eq, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded>
impl<'a, K: Hash + Eq, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded>
Sourcepub fn get(
self,
key: Singleton<K, Tick<L>, Bounded>,
) -> Optional<V, Tick<L>, Bounded>
pub fn get( self, key: Singleton<K, Tick<L>, Bounded>, ) -> Optional<V, Tick<L>, Bounded>
Gets the value associated with a specific key from the keyed singleton.
§Example
let tick = process.tick();
let keyed_data = process
.source_iter(q!(vec![(1, 2), (2, 3)]))
.into_keyed()
.batch(&tick, nondet!(/** test */))
.first();
let key = tick.singleton(q!(1));
keyed_data.get(key).all_ticks()
// 2Sourcepub fn get_many_if_present<O2: Ordering, R2: Retries, V2>(
self,
requests: KeyedStream<K, V2, Tick<L>, Bounded, O2, R2>,
) -> KeyedStream<K, (V, V2), Tick<L>, Bounded, NoOrder, R2>
pub fn get_many_if_present<O2: Ordering, R2: Retries, V2>( self, requests: KeyedStream<K, V2, Tick<L>, Bounded, O2, R2>, ) -> KeyedStream<K, (V, V2), Tick<L>, Bounded, NoOrder, R2>
Given a keyed stream of lookup requests, where the key is the lookup and the value is some additional metadata, emits a keyed stream of lookup results where the key is the same as before, but the value is a tuple of the lookup result and the metadata of the request. If the key is not found, no output will be produced.
§Example
let tick = process.tick();
let keyed_data = process
.source_iter(q!(vec![(1, 10), (2, 20)]))
.into_keyed()
.batch(&tick, nondet!(/** test */))
.first();
let other_data = process
.source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
.into_keyed()
.batch(&tick, nondet!(/** test */));
keyed_data.get_many_if_present(other_data).entries().all_ticks()
// { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any orderSourcepub fn get_from<V2: Clone>(
self,
from: KeyedSingleton<V, V2, Tick<L>, Bounded>,
) -> KeyedSingleton<K, (V, Option<V2>), Tick<L>, Bounded>
pub fn get_from<V2: Clone>( self, from: KeyedSingleton<V, V2, Tick<L>, Bounded>, ) -> KeyedSingleton<K, (V, Option<V2>), Tick<L>, Bounded>
For each entry in self, looks up the entry in the from with a key that matches the
value of the entry in self. The output is a keyed singleton with tuple values
containing the value from self and an option of the value from from. If the key is not
present in from, the option will be None.
§Example
let requests = // { 1: 10, 2: 20 }
let other_data = // { 10: 100, 11: 101 }
requests.get_from(other_data)
// { 1: (10, Some(100)), 2: (20, None) }Source§impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>where
L: Location<'a>,
impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>where
L: Location<'a>,
Sourcepub fn atomic(self, tick: &Tick<L>) -> KeyedSingleton<K, V, Atomic<L>, B>
pub fn atomic(self, tick: &Tick<L>) -> KeyedSingleton<K, V, Atomic<L>, B>
Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
will all be executed synchronously before any outputs are yielded (in KeyedSingleton::end_atomic).
This is useful to enforce local consistency constraints, such as ensuring that a write is
processed before an acknowledgement is emitted. Entering an atomic section requires a Tick
argument that declares where the keyed singleton will be atomically processed. Batching a
keyed singleton into the same Tick will preserve the synchronous execution, while
batching into a different Tick will introduce asynchrony.
Source§impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
Sourcepub fn end_atomic(self) -> KeyedSingleton<K, V, L, B>
pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B>
Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
See KeyedSingleton::atomic for more details.
Source§impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded>
impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded>
Sourcepub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded>
pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded>
Shifts the state in self to the next tick, so that the returned keyed singleton at
tick T always has the entries of self at tick T - 1.
At tick 0, the output has no entries, since there is no previous tick.
This operator enables stateful iterative processing with ticks, by sending data from one tick to the next. For example, you can use it to compare state across consecutive batches.
§Example
let tick = process.tick();
let input_batch = // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
input_batch.clone().filter_key_not_in(
input_batch.defer_tick().keys() // keys present in the previous tick
)
// { 1: 2, 2: 3 } (first tick), { 3: 5 } (second tick)Source§impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>where
L: Location<'a>,
impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>where
L: Location<'a>,
Sourcepub fn snapshot(
self,
tick: &Tick<L>,
_nondet: NonDet,
) -> KeyedSingleton<K, V, Tick<L>, Bounded>
pub fn snapshot( self, tick: &Tick<L>, _nondet: NonDet, ) -> KeyedSingleton<K, V, Tick<L>, Bounded>
Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic point in time.
§Non-Determinism
Because this picks a snapshot of each entry, which is continuously changing, each output has a non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
Source§impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
Sourcepub fn snapshot_atomic(
self,
_nondet: NonDet,
) -> KeyedSingleton<K, V, Tick<L>, Bounded>
pub fn snapshot_atomic( self, _nondet: NonDet, ) -> KeyedSingleton<K, V, Tick<L>, Bounded>
Returns a keyed singleton with a snapshot of each key-value entry, consistent with the state of the keyed singleton being atomically processed.
§Non-Determinism
Because this picks a snapshot of each entry, which is continuously changing, each output has a non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
Source§impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>where
L: Location<'a>,
impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>where
L: Location<'a>,
Sourcepub fn filter<F>(
self,
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedSingleton<K, V, L, B>
pub fn filter<F>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedSingleton<K, V, L, B>
Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate f.
The closure f receives a reference &V to each value and returns a boolean. If the predicate
returns true, the key-value pair is included in the output. If it returns false, the pair
is filtered out.
The closure f receives a reference &V rather than an owned value V because filtering does
not modify or take ownership of the values. If you need to modify the values while filtering
use KeyedSingleton::filter_map instead.
§Example
let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
keyed_singleton.filter(q!(|&v| v > 1))
// { 1: 2, 2: 4 }Sourcepub fn filter_map<F, U>(
self,
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedSingleton<K, U, L, B>
pub fn filter_map<F, U>( self, f: impl IntoQuotedMut<'a, F, L> + Copy, ) -> KeyedSingleton<K, U, L, B>
An operator that both filters and maps values. It yields only the key-value pairs where
the supplied closure f returns Some(value).
The closure f receives each value V and returns Option<U>. If the closure returns
Some(new_value), the key-value pair (key, new_value) is included in the output.
If it returns None, the key-value pair is filtered out.
§Example
let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
// { 1: 42, 3: 100 }Sourcepub fn batch(
self,
tick: &Tick<L>,
nondet: NonDet,
) -> KeyedSingleton<K, V, Tick<L>, Bounded>where
L: NoTick,
pub fn batch(
self,
tick: &Tick<L>,
nondet: NonDet,
) -> KeyedSingleton<K, V, Tick<L>, Bounded>where
L: NoTick,
Returns a keyed singleton with entries consisting of new key-value pairs that have arrived since the previous batch was released.
Currently, there is no all_ticks dual on KeyedSingleton, instead you may want to use
KeyedSingleton::into_keyed_stream then yield with KeyedStream::all_ticks.
§Non-Determinism
Because this picks a batch of asynchronously added entries, each output keyed singleton has a non-deterministic set of key-value pairs.
Source§impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
Sourcepub fn batch_atomic(
self,
nondet: NonDet,
) -> KeyedSingleton<K, V, Tick<L>, Bounded>
pub fn batch_atomic( self, nondet: NonDet, ) -> KeyedSingleton<K, V, Tick<L>, Bounded>
Returns a keyed singleton with entries consisting of new key-value pairs that are being atomically processed.
Currently, there is no dual to asynchronously yield back outside the tick, instead you
should use KeyedSingleton::into_keyed_stream and yield a KeyedStream.
§Non-Determinism
Because this picks a batch of asynchronously added entries, each output keyed singleton has a non-deterministic set of key-value pairs.
Trait Implementations§
Auto Trait Implementations§
impl<K, V, Loc, Bound> !Freeze for KeyedSingleton<K, V, Loc, Bound>
impl<K, V, Loc, Bound> !RefUnwindSafe for KeyedSingleton<K, V, Loc, Bound>
impl<K, V, Loc, Bound> !Send for KeyedSingleton<K, V, Loc, Bound>
impl<K, V, Loc, Bound> !Sync for KeyedSingleton<K, V, Loc, Bound>
impl<K, V, Loc, Bound> Unpin for KeyedSingleton<K, V, Loc, Bound>
impl<K, V, Loc, Bound> !UnwindSafe for KeyedSingleton<K, V, Loc, Bound>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more§impl<T> ToSinkBuild for T
impl<T> ToSinkBuild for T
§fn iter_to_sink_build(self) -> SendIterBuild<Self>
fn iter_to_sink_build(self) -> SendIterBuild<Self>
§fn stream_to_sink_build(self) -> SendStreamBuild<Self>where
Self: Sized + Stream,
fn stream_to_sink_build(self) -> SendStreamBuild<Self>where
Self: Sized + Stream,
SinkBuild] adaptor chain to send all items from self as a [Stream].