pub struct Singleton<Type, Loc, Bound: Boundedness> { /* private fields */ }
Expand description
A single Rust value that can asynchronously change over time.
If the singleton is Bounded
, the value is frozen and will not change. But if it is
Unbounded
, the value will asynchronously change over time.
Singletons are often used to capture state in a Hydro program, such as an event counter which is a single number that will asynchronously change as events are processed. Singletons also appear when dealing with bounded collections, to perform regular Rust computations on concrete values, such as getting the length of a batch of requests.
Type Parameters:
Implementations§
Source§impl<'a, T, L, B: Boundedness> Singleton<T, L, B>where
L: Location<'a>,
impl<'a, T, L, B: Boundedness> Singleton<T, L, B>where
L: Location<'a>,
Sourcepub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>where
F: Fn(T) -> U + 'a,
pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>where
F: Fn(T) -> U + 'a,
Transforms the singleton value by applying a function f
to it,
continuously as the input is updated.
§Example
let tick = process.tick();
let singleton = tick.singleton(q!(5));
singleton.map(q!(|v| v * 2)).all_ticks()
// 10
Sourcepub fn flat_map_ordered<U, I, F>(
self,
f: impl IntoQuotedMut<'a, F, L>,
) -> Stream<U, L, B, TotalOrder, ExactlyOnce>where
I: IntoIterator<Item = U>,
F: Fn(T) -> I + 'a,
pub fn flat_map_ordered<U, I, F>(
self,
f: impl IntoQuotedMut<'a, F, L>,
) -> Stream<U, L, B, TotalOrder, ExactlyOnce>where
I: IntoIterator<Item = U>,
F: Fn(T) -> I + 'a,
Transforms the singleton value by applying a function f
to it and then flattening
the result into a stream, preserving the order of elements.
The function f
is applied to the singleton value to produce an iterator, and all items
from that iterator are emitted in the output stream in deterministic order.
The implementation of Iterator
for the output type I
must produce items in a
deterministic order. For example, I
could be a Vec
, but not a HashSet
.
If the order is not deterministic, use Singleton::flat_map_unordered
instead.
§Example
let tick = process.tick();
let singleton = tick.singleton(q!(vec![1, 2, 3]));
singleton.flat_map_ordered(q!(|v| v)).all_ticks()
// 1, 2, 3
Sourcepub fn flat_map_unordered<U, I, F>(
self,
f: impl IntoQuotedMut<'a, F, L>,
) -> Stream<U, L, B, NoOrder, ExactlyOnce>where
I: IntoIterator<Item = U>,
F: Fn(T) -> I + 'a,
pub fn flat_map_unordered<U, I, F>(
self,
f: impl IntoQuotedMut<'a, F, L>,
) -> Stream<U, L, B, NoOrder, ExactlyOnce>where
I: IntoIterator<Item = U>,
F: Fn(T) -> I + 'a,
Like Singleton::flat_map_ordered
, but allows the implementation of Iterator
for the output type I
to produce items in any order.
The function f
is applied to the singleton value to produce an iterator, and all items
from that iterator are emitted in the output stream in non-deterministic order.
§Example
let tick = process.tick();
let singleton = tick.singleton(q!(
std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
));
singleton.flat_map_unordered(q!(|v| v)).all_ticks()
// 1, 2, 3, but in no particular order
Sourcepub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>where
T: IntoIterator<Item = U>,
pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>where
T: IntoIterator<Item = U>,
Flattens the singleton value into a stream, preserving the order of elements.
The singleton value must implement IntoIterator
, and all items from that iterator
are emitted in the output stream in deterministic order.
The implementation of Iterator
for the element type T
must produce items in a
deterministic order. For example, T
could be a Vec
, but not a HashSet
.
If the order is not deterministic, use Singleton::flatten_unordered
instead.
§Example
let tick = process.tick();
let singleton = tick.singleton(q!(vec![1, 2, 3]));
singleton.flatten_ordered().all_ticks()
// 1, 2, 3
Sourcepub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>where
T: IntoIterator<Item = U>,
pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>where
T: IntoIterator<Item = U>,
Like Singleton::flatten_ordered
, but allows the implementation of Iterator
for the element type T
to produce items in any order.
The singleton value must implement IntoIterator
, and all items from that iterator
are emitted in the output stream in non-deterministic order.
§Example
let tick = process.tick();
let singleton = tick.singleton(q!(
std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
));
singleton.flatten_unordered().all_ticks()
// 1, 2, 3, but in no particular order
Sourcepub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
Creates an optional containing the singleton value if it satisfies a predicate f
.
If the predicate returns true
, the output optional contains the same value.
If the predicate returns false
, the output optional is empty.
The closure f
receives a reference &T
rather than an owned value T
because filtering does
not modify or take ownership of the value. If you need to modify the value while filtering
use Singleton::filter_map
instead.
§Example
let tick = process.tick();
let singleton = tick.singleton(q!(5));
singleton.filter(q!(|&x| x > 3)).all_ticks()
// 5
Sourcepub fn filter_map<U, F>(
self,
f: impl IntoQuotedMut<'a, F, L>,
) -> Optional<U, L, B>
pub fn filter_map<U, F>( self, f: impl IntoQuotedMut<'a, F, L>, ) -> Optional<U, L, B>
An operator that both filters and maps. It yields the value only if the supplied
closure f
returns Some(value)
.
If the closure returns Some(new_value)
, the output optional contains new_value
.
If the closure returns None
, the output optional is empty.
§Example
let tick = process.tick();
let singleton = tick.singleton(q!("42"));
singleton
.filter_map(q!(|s| s.parse::<i32>().ok()))
.all_ticks()
// 42
Sourcepub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Outwhere
Self: ZipResult<'a, O, Location = L>,
pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Outwhere
Self: ZipResult<'a, O, Location = L>,
Combines this singleton with another Singleton
or Optional
by tupling their values.
If the other value is a Singleton
, the output will be a Singleton
, but if it is an
Optional
, the output will be an Optional
that is non-null only if the argument is
non-null. This is useful for combining several pieces of state together.
§Example
let tick = process.tick();
let numbers = process
.source_iter(q!(vec![123, 456]))
.batch(&tick, nondet!(/** test */));
let count = numbers.clone().count(); // Singleton
let max = numbers.max(); // Optional
count.zip(max).all_ticks()
// [(2, 456)]
Sourcepub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>
pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>
Filters this singleton into an Optional
, passing through the singleton value if the
argument (a Bounded
Optional
`) is non-null, otherwise the output is null.
Useful for conditionally processing, such as only emitting a singleton’s value outside a tick if some other condition is satisfied.
§Example
let tick = process.tick();
// ticks are lazy by default, forces the second tick to run
tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
let batch_first_tick = process
.source_iter(q!(vec![1]))
.batch(&tick, nondet!(/** test */));
let batch_second_tick = process
.source_iter(q!(vec![1, 2, 3]))
.batch(&tick, nondet!(/** test */))
.defer_tick(); // appears on the second tick
let some_on_first_tick = tick.optional_first_tick(q!(()));
batch_first_tick.chain(batch_second_tick).count()
.filter_if_some(some_on_first_tick)
.all_ticks()
// [1]
Sourcepub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>
pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>
Filters this singleton into an Optional
, passing through the singleton value if the
argument (a Bounded
Optional
`) is null, otherwise the output is null.
Like Singleton::filter_if_some
, this is useful for conditional processing, but inverts
the condition.
§Example
let tick = process.tick();
// ticks are lazy by default, forces the second tick to run
tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
let batch_first_tick = process
.source_iter(q!(vec![1]))
.batch(&tick, nondet!(/** test */));
let batch_second_tick = process
.source_iter(q!(vec![1, 2, 3]))
.batch(&tick, nondet!(/** test */))
.defer_tick(); // appears on the second tick
let some_on_first_tick = tick.optional_first_tick(q!(()));
batch_first_tick.chain(batch_second_tick).count()
.filter_if_none(some_on_first_tick)
.all_ticks()
// [3]
Sourcepub fn ir_node_named(self, name: &str) -> Singleton<T, L, B>
pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B>
An operator which allows you to “name” a HydroNode
.
This is only used for testing, to correlate certain HydroNode
s with IDs.
Source§impl<'a, T, L, B: Boundedness> Singleton<T, Atomic<L>, B>
impl<'a, T, L, B: Boundedness> Singleton<T, Atomic<L>, B>
Sourcepub fn snapshot(self, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded>
pub fn snapshot(self, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded>
Returns a singleton value corresponding to the latest snapshot of the singleton
being atomically processed. The snapshot at tick t + 1
is guaranteed to include
at least all relevant data that contributed to the snapshot at tick t
. Furthermore,
all snapshots of this singleton into the atomic-associated tick will observe the
same value each tick.
§Non-Determinism
Because this picks a snapshot of a singleton whose value is continuously changing, the output singleton has a non-deterministic value since the snapshot can be at an arbitrary point in time.
Sourcepub fn end_atomic(self) -> Optional<T, L, B>
pub fn end_atomic(self) -> Optional<T, L, B>
Returns this singleton back into a top-level, asynchronous execution context where updates to the value will be asynchronously propagated.
Source§impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
Sourcepub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B>
pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B>
Shifts this singleton into an atomic context, which guarantees that any downstream logic
will observe the same version of the value and will be executed synchronously before any
outputs are yielded (in Optional::end_atomic
).
This is useful to enforce local consistency constraints, such as ensuring that several readers
see a consistent version of local state (since otherwise each Singleton::snapshot
may pick
a different version).
Entering an atomic section requires a Tick
argument that declares where the singleton will
be atomically processed. Snapshotting an singleton into the same Tick
will preserve the
synchronous execution, and all such snapshots in the same Tick
will have the same value.
Sourcepub fn snapshot(
self,
tick: &Tick<L>,
nondet: NonDet,
) -> Singleton<T, Tick<L>, Bounded>where
L: NoTick,
pub fn snapshot(
self,
tick: &Tick<L>,
nondet: NonDet,
) -> Singleton<T, Tick<L>, Bounded>where
L: NoTick,
Given a tick, returns a singleton value corresponding to a snapshot of the singleton
as of that tick. The snapshot at tick t + 1
is guaranteed to include at least all
relevant data that contributed to the snapshot at tick t
.
§Non-Determinism
Because this picks a snapshot of a singleton whose value is continuously changing, the output singleton has a non-deterministic value since the snapshot can be at an arbitrary point in time.
Sourcepub fn sample_eager(
self,
nondet: NonDet,
) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
pub fn sample_eager( self, nondet: NonDet, ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
Eagerly samples the singleton as fast as possible, returning a stream of snapshots with order corresponding to increasing prefixes of data contributing to the singleton.
§Non-Determinism
At runtime, the singleton will be arbitrarily sampled as fast as possible, but due to non-deterministic batching and arrival of inputs, the output stream is non-deterministic.
Sourcepub fn sample_every(
self,
interval: impl QuotedWithContext<'a, Duration, L> + Copy + 'a,
nondet: NonDet,
) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
pub fn sample_every( self, interval: impl QuotedWithContext<'a, Duration, L> + Copy + 'a, nondet: NonDet, ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
Given a time interval, returns a stream corresponding to snapshots of the singleton
value taken at various points in time. Because the input singleton may be
Unbounded
, there are no guarantees on what these snapshots are other than they
represent the value of the singleton given some prefix of the streams leading up to
it.
§Non-Determinism
The output stream is non-deterministic in which elements are sampled, since this is controlled by a clock.
Source§impl<'a, T, L> Singleton<T, Tick<L>, Bounded>where
L: Location<'a>,
impl<'a, T, L> Singleton<T, Tick<L>, Bounded>where
L: Location<'a>,
Sourcepub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce>
pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce>
Asynchronously yields the value of this singleton outside the tick as an unbounded stream, which will stream the value computed in each tick as a separate stream element.
Unlike Singleton::latest
, the value computed in each tick is emitted separately,
producing one element in the output for each tick. This is useful for batched computations,
where the results from each tick must be combined together.
§Example
let tick = process.tick();
input_batch // first tick: [1], second tick: [1, 2, 3]
.count()
.all_ticks()
// [1, 3]
Sourcepub fn all_ticks_atomic(
self,
) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce>
pub fn all_ticks_atomic( self, ) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce>
Synchronously yields the value of this singleton outside the tick as an unbounded stream, which will stream the value computed in each tick as a separate stream element.
Unlike Singleton::all_ticks
, this preserves synchronous execution, as the output stream
is emitted in an Atomic
context that will process elements synchronously with the input
singleton’s Tick
context.
Sourcepub fn latest(self) -> Singleton<T, L, Unbounded>
pub fn latest(self) -> Singleton<T, L, Unbounded>
Asynchronously yields this singleton outside the tick as an unbounded singleton, which will be asynchronously updated with the latest value of the singleton inside the tick.
This converts a bounded value inside a tick into an asynchronous value outside the tick that tracks the inner value. This is useful for getting the value as of the “most recent” tick, but note that updates are propagated asynchronously outside the tick.
§Example
let tick = process.tick();
input_batch // first tick: [1], second tick: [1, 2, 3]
.count()
.latest()
// asynchronously changes from 1 ~> 3
Sourcepub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded>
pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded>
Synchronously yields this singleton outside the tick as an unbounded singleton, which will be updated with the latest value of the singleton inside the tick.
Unlike Singleton::latest
, this preserves synchronous execution, as the output singleton
is emitted in an Atomic
context that will process elements synchronously with the input
singleton’s Tick
context.
pub fn defer_tick(self) -> Singleton<T, Tick<L>, Bounded>
pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce>
Sourcepub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce>
pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce>
Converts this singleton into a Stream
containing a single element, the value.
§Example
let tick = process.tick();
let batch_input = process
.source_iter(q!(vec![123, 456]))
.batch(&tick, nondet!(/** test */));
batch_input.clone().chain(
batch_input.count().into_stream()
).all_ticks()
// [123, 456, 2]
Trait Implementations§
Source§impl<'a, T, L, B: Boundedness> Clone for Singleton<T, L, B>
impl<'a, T, L, B: Boundedness> Clone for Singleton<T, L, B>
Source§impl<'a, T, L> DeferTick for Singleton<T, Tick<L>, Bounded>where
L: Location<'a>,
impl<'a, T, L> DeferTick for Singleton<T, Tick<L>, Bounded>where
L: Location<'a>,
fn defer_tick(self) -> Self
Auto Trait Implementations§
impl<Type, Loc, Bound> !Freeze for Singleton<Type, Loc, Bound>
impl<Type, Loc, Bound> !RefUnwindSafe for Singleton<Type, Loc, Bound>
impl<Type, Loc, Bound> !Send for Singleton<Type, Loc, Bound>
impl<Type, Loc, Bound> !Sync for Singleton<Type, Loc, Bound>
impl<Type, Loc, Bound> Unpin for Singleton<Type, Loc, Bound>
impl<Type, Loc, Bound> !UnwindSafe for Singleton<Type, Loc, Bound>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more