pub struct Optional<Type, Loc, Bound: Boundedness> { /* private fields */ }
Expand description
A nullable Rust value that can asynchronously change over time.
Optionals are the live collection equivalent of Option
. If the optional is Bounded
,
the value is frozen and will not change. But if it is Unbounded
, the value will
asynchronously change over time, including becoming present of uninhabited.
Optionals are used in many of the same places as Singleton
, but when the value may be
nullable. For example, the first element of a Stream
is exposed as an Optional
.
Type Parameters:
Implementations§
Source§impl<'a, T, L, B: Boundedness> Optional<T, L, B>where
L: Location<'a>,
impl<'a, T, L, B: Boundedness> Optional<T, L, B>where
L: Location<'a>,
Sourcepub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>where
F: Fn(T) -> U + 'a,
pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>where
F: Fn(T) -> U + 'a,
Transforms the optional value by applying a function f
to it,
continuously as the input is updated.
Whenever the optional is empty, the output optional is also empty.
§Example
let tick = process.tick();
let optional = tick.optional_first_tick(q!(1));
optional.map(q!(|v| v + 1)).all_ticks()
// 2
Sourcepub fn flat_map_ordered<U, I, F>(
self,
f: impl IntoQuotedMut<'a, F, L>,
) -> Stream<U, L, B, TotalOrder, ExactlyOnce>where
I: IntoIterator<Item = U>,
F: Fn(T) -> I + 'a,
pub fn flat_map_ordered<U, I, F>(
self,
f: impl IntoQuotedMut<'a, F, L>,
) -> Stream<U, L, B, TotalOrder, ExactlyOnce>where
I: IntoIterator<Item = U>,
F: Fn(T) -> I + 'a,
Transforms the optional value by applying a function f
to it and then flattening
the result into a stream, preserving the order of elements.
If the optional is empty, the output stream is also empty. If the optional contains
a value, f
is applied to produce an iterator, and all items from that iterator
are emitted in the output stream in deterministic order.
The implementation of Iterator
for the output type I
must produce items in a
deterministic order. For example, I
could be a Vec
, but not a HashSet
.
If the order is not deterministic, use Optional::flat_map_unordered
instead.
§Example
let tick = process.tick();
let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
optional.flat_map_ordered(q!(|v| v)).all_ticks()
// 1, 2, 3
Sourcepub fn flat_map_unordered<U, I, F>(
self,
f: impl IntoQuotedMut<'a, F, L>,
) -> Stream<U, L, B, NoOrder, ExactlyOnce>where
I: IntoIterator<Item = U>,
F: Fn(T) -> I + 'a,
pub fn flat_map_unordered<U, I, F>(
self,
f: impl IntoQuotedMut<'a, F, L>,
) -> Stream<U, L, B, NoOrder, ExactlyOnce>where
I: IntoIterator<Item = U>,
F: Fn(T) -> I + 'a,
Like Optional::flat_map_ordered
, but allows the implementation of Iterator
for the output type I
to produce items in any order.
If the optional is empty, the output stream is also empty. If the optional contains
a value, f
is applied to produce an iterator, and all items from that iterator
are emitted in the output stream in non-deterministic order.
§Example
let tick = process.tick();
let optional = tick.optional_first_tick(q!(
std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
));
optional.flat_map_unordered(q!(|v| v)).all_ticks()
// 1, 2, 3, but in no particular order
Sourcepub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>where
T: IntoIterator<Item = U>,
pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>where
T: IntoIterator<Item = U>,
Flattens the optional value into a stream, preserving the order of elements.
If the optional is empty, the output stream is also empty. If the optional contains
a value that implements IntoIterator
, all items from that iterator are emitted
in the output stream in deterministic order.
The implementation of Iterator
for the element type T
must produce items in a
deterministic order. For example, T
could be a Vec
, but not a HashSet
.
If the order is not deterministic, use Optional::flatten_unordered
instead.
§Example
let tick = process.tick();
let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
optional.flatten_ordered().all_ticks()
// 1, 2, 3
Sourcepub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>where
T: IntoIterator<Item = U>,
pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>where
T: IntoIterator<Item = U>,
Like Optional::flatten_ordered
, but allows the implementation of Iterator
for the element type T
to produce items in any order.
If the optional is empty, the output stream is also empty. If the optional contains
a value that implements IntoIterator
, all items from that iterator are emitted
in the output stream in non-deterministic order.
§Example
let tick = process.tick();
let optional = tick.optional_first_tick(q!(
std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
));
optional.flatten_unordered().all_ticks()
// 1, 2, 3, but in no particular order
Sourcepub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
Creates an optional containing only the value if it satisfies a predicate f
.
If the optional is empty, the output optional is also empty. If the optional contains
a value and the predicate returns true
, the output optional contains the same value.
If the predicate returns false
, the output optional is empty.
The closure f
receives a reference &T
rather than an owned value T
because filtering does
not modify or take ownership of the value. If you need to modify the value while filtering
use Optional::filter_map
instead.
§Example
let tick = process.tick();
let optional = tick.optional_first_tick(q!(5));
optional.filter(q!(|&x| x > 3)).all_ticks()
// 5
Sourcepub fn filter_map<U, F>(
self,
f: impl IntoQuotedMut<'a, F, L>,
) -> Optional<U, L, B>
pub fn filter_map<U, F>( self, f: impl IntoQuotedMut<'a, F, L>, ) -> Optional<U, L, B>
An operator that both filters and maps. It yields only the value if the supplied
closure f
returns Some(value)
.
If the optional is empty, the output optional is also empty. If the optional contains
a value and the closure returns Some(new_value)
, the output optional contains new_value
.
If the closure returns None
, the output optional is empty.
§Example
let tick = process.tick();
let optional = tick.optional_first_tick(q!("42"));
optional
.filter_map(q!(|s| s.parse::<i32>().ok()))
.all_ticks()
// 42
Sourcepub fn zip<O>(
self,
other: impl Into<Optional<O, L, B>>,
) -> Optional<(T, O), L, B>where
O: Clone,
pub fn zip<O>(
self,
other: impl Into<Optional<O, L, B>>,
) -> Optional<(T, O), L, B>where
O: Clone,
Combines this singleton with another Singleton
or Optional
by tupling their values.
If the other value is a Optional
, the output will be non-null only if the argument is
non-null. This is useful for combining several pieces of state together.
§Example
let tick = process.tick();
let numbers = process
.source_iter(q!(vec![123, 456, 789]))
.batch(&tick, nondet!(/** test */));
let min = numbers.clone().min(); // Optional
let max = numbers.max(); // Optional
min.zip(max).all_ticks()
// [(123, 789)]
Sourcepub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B>
pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B>
Passes through self
when it has a value, otherwise passes through other
.
Like Option::or
, this is helpful for defining a fallback for an Optional
, when the
fallback itself is an Optional
. If the fallback is a Singleton
, you can use
Optional::unwrap_or
to ensure that the output is always non-null.
If the inputs are Unbounded
, the output will be asynchronously updated as the contents
of the inputs change (including to/from null states).
§Example
let tick = process.tick();
// ticks are lazy by default, forces the second tick to run
tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
let some_first_tick = tick.optional_first_tick(q!(123));
let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
some_first_tick.or(some_second_tick).all_ticks()
// [123 /* first tick */, 456 /* second tick */]
Sourcepub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B>
pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B>
Gets the contents of self
when it has a value, otherwise passes through other
.
Like Option::unwrap_or
, this is helpful for defining a fallback for an Optional
.
If the fallback is not always defined (an Optional
), you can use Optional::or
.
If the inputs are Unbounded
, the output will be asynchronously updated as the contents
of the inputs change (including to/from null states).
§Example
let tick = process.tick();
// ticks are lazy by default, forces the later ticks to run
tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
let some_first_tick = tick.optional_first_tick(q!(123));
some_first_tick
.unwrap_or(tick.singleton(q!(456)))
.all_ticks()
// [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
Sourcepub fn into_singleton(self) -> Singleton<Option<T>, L, B>where
T: Clone,
pub fn into_singleton(self) -> Singleton<Option<T>, L, B>where
T: Clone,
Converts this optional into a Singleton
with a Rust Option
as its contents.
Useful for writing custom Rust code that needs to interact with both the null and non-null
states of the Optional
. When possible, you should use the native APIs on Optional
so that Hydro can skip any computation on null values.
§Example
let tick = process.tick();
// ticks are lazy by default, forces the later ticks to run
tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
let some_first_tick = tick.optional_first_tick(q!(123));
some_first_tick.into_singleton().all_ticks()
// [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
Sourcepub fn ir_node_named(self, name: &str) -> Optional<T, L, B>
pub fn ir_node_named(self, name: &str) -> Optional<T, L, B>
An operator which allows you to “name” a HydroNode
.
This is only used for testing, to correlate certain HydroNode
s with IDs.
Source§impl<'a, T, L> Optional<T, L, Bounded>where
L: Location<'a>,
impl<'a, T, L> Optional<T, L, Bounded>where
L: Location<'a>,
Sourcepub fn filter_if_some<U>(
self,
signal: Optional<U, L, Bounded>,
) -> Optional<T, L, Bounded>
pub fn filter_if_some<U>( self, signal: Optional<U, L, Bounded>, ) -> Optional<T, L, Bounded>
Filters this optional, passing through the optional value if it is non-null and the
argument (a Bounded
Optional
`) is non-null, otherwise the output is null.
Useful for conditionally processing, such as only emitting an optional’s value outside a tick if some other condition is satisfied.
§Example
let tick = process.tick();
// ticks are lazy by default, forces the second tick to run
tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
let batch_first_tick = process
.source_iter(q!(vec![]))
.batch(&tick, nondet!(/** test */));
let batch_second_tick = process
.source_iter(q!(vec![456]))
.batch(&tick, nondet!(/** test */))
.defer_tick(); // appears on the second tick
let some_on_first_tick = tick.optional_first_tick(q!(()));
batch_first_tick.chain(batch_second_tick).first()
.filter_if_some(some_on_first_tick)
.unwrap_or(tick.singleton(q!(789)))
.all_ticks()
// [789, 789]
Sourcepub fn filter_if_none<U>(
self,
other: Optional<U, L, Bounded>,
) -> Optional<T, L, Bounded>
pub fn filter_if_none<U>( self, other: Optional<U, L, Bounded>, ) -> Optional<T, L, Bounded>
Filters this optional, passing through the optional value if it is non-null and the
argument (a Bounded
Optional
`) is null, otherwise the output is null.
Useful for conditionally processing, such as only emitting an optional’s value outside a tick if some other condition is satisfied.
§Example
let tick = process.tick();
// ticks are lazy by default, forces the second tick to run
tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
let batch_first_tick = process
.source_iter(q!(vec![]))
.batch(&tick, nondet!(/** test */));
let batch_second_tick = process
.source_iter(q!(vec![456]))
.batch(&tick, nondet!(/** test */))
.defer_tick(); // appears on the second tick
let some_on_first_tick = tick.optional_first_tick(q!(()));
batch_first_tick.chain(batch_second_tick).first()
.filter_if_none(some_on_first_tick)
.unwrap_or(tick.singleton(q!(789)))
.all_ticks()
// [789, 789]
Sourcepub fn if_some_then<U>(
self,
value: Singleton<U, L, Bounded>,
) -> Optional<U, L, Bounded>
pub fn if_some_then<U>( self, value: Singleton<U, L, Bounded>, ) -> Optional<U, L, Bounded>
If self
is null, emits a null optional, but if it non-null, emits value
.
Useful for gating the release of a Singleton
on a condition of the Optional
having a value, such as only releasing a piece of state if the node is the leader.
§Example
let tick = process.tick();
// ticks are lazy by default, forces the second tick to run
tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
let some_on_first_tick = tick.optional_first_tick(q!(()));
some_on_first_tick
.if_some_then(tick.singleton(q!(456)))
.unwrap_or(tick.singleton(q!(123)))
// 456 (first tick) ~> 123 (second tick onwards)
Source§impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
Sourcepub fn snapshot(self, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded>
pub fn snapshot(self, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded>
Returns an optional value corresponding to the latest snapshot of the optional
being atomically processed. The snapshot at tick t + 1
is guaranteed to include
at least all relevant data that contributed to the snapshot at tick t
. Furthermore,
all snapshots of this optional into the atomic-associated tick will observe the
same value each tick.
§Non-Determinism
Because this picks a snapshot of a optional whose value is continuously changing, the output optional has a non-deterministic value since the snapshot can be at an arbitrary point in time.
Sourcepub fn end_atomic(self) -> Optional<T, L, B>
pub fn end_atomic(self) -> Optional<T, L, B>
Returns this optional back into a top-level, asynchronous execution context where updates to the value will be asynchronously propagated.
Source§impl<'a, T, L, B: Boundedness> Optional<T, L, B>
impl<'a, T, L, B: Boundedness> Optional<T, L, B>
Sourcepub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B>
pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B>
Shifts this optional into an atomic context, which guarantees that any downstream logic
will observe the same version of the value and will be executed synchronously before any
outputs are yielded (in Optional::end_atomic
).
This is useful to enforce local consistency constraints, such as ensuring that several readers
see a consistent version of local state (since otherwise each Optional::snapshot
may pick
a different version).
Entering an atomic section requires a Tick
argument that declares where the optional will
be atomically processed. Snapshotting an optional into the same Tick
will preserve the
synchronous execution, and all such snapshots in the same Tick
will have the same value.
Sourcepub fn snapshot(
self,
tick: &Tick<L>,
nondet: NonDet,
) -> Optional<T, Tick<L>, Bounded>
pub fn snapshot( self, tick: &Tick<L>, nondet: NonDet, ) -> Optional<T, Tick<L>, Bounded>
Given a tick, returns a optional value corresponding to a snapshot of the optional
as of that tick. The snapshot at tick t + 1
is guaranteed to include at least all
relevant data that contributed to the snapshot at tick t
.
§Non-Determinism
Because this picks a snapshot of a optional whose value is continuously changing, the output optional has a non-deterministic value since the snapshot can be at an arbitrary point in time.
Sourcepub fn sample_eager(
self,
nondet: NonDet,
) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
pub fn sample_eager( self, nondet: NonDet, ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
Eagerly samples the optional as fast as possible, returning a stream of snapshots with order corresponding to increasing prefixes of data contributing to the optional.
§Non-Determinism
At runtime, the optional will be arbitrarily sampled as fast as possible, but due to non-deterministic batching and arrival of inputs, the output stream is non-deterministic.
Sourcepub fn sample_every(
self,
interval: impl QuotedWithContext<'a, Duration, L> + Copy + 'a,
nondet: NonDet,
) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
pub fn sample_every( self, interval: impl QuotedWithContext<'a, Duration, L> + Copy + 'a, nondet: NonDet, ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
Given a time interval, returns a stream corresponding to snapshots of the optional
value taken at various points in time. Because the input optional may be
Unbounded
, there are no guarantees on what these snapshots are other than they
represent the value of the optional given some prefix of the streams leading up to
it.
§Non-Determinism
The output stream is non-deterministic in which elements are sampled, since this is controlled by a clock.
Source§impl<'a, T, L> Optional<T, Tick<L>, Bounded>where
L: Location<'a>,
impl<'a, T, L> Optional<T, Tick<L>, Bounded>where
L: Location<'a>,
Sourcepub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce>
pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce>
Asynchronously yields the value of this singleton outside the tick as an unbounded stream, which will stream the value computed in each tick as a separate stream element (skipping null values).
Unlike Optional::latest
, the value computed in each tick is emitted separately,
producing one element in the output for each (non-null) tick. This is useful for batched
computations, where the results from each tick must be combined together.
§Example
input_batch // first tick: [], second tick: [1, 2, 3]
.max()
.all_ticks()
// [3]
Sourcepub fn all_ticks_atomic(
self,
) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce>
pub fn all_ticks_atomic( self, ) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce>
Synchronously yields the value of this optional outside the tick as an unbounded stream, which will stream the value computed in each tick as a separate stream element.
Unlike Optional::all_ticks
, this preserves synchronous execution, as the output stream
is emitted in an Atomic
context that will process elements synchronously with the input
optional’s Tick
context.
Sourcepub fn latest(self) -> Optional<T, L, Unbounded>
pub fn latest(self) -> Optional<T, L, Unbounded>
Asynchronously yields this optional outside the tick as an unbounded optional, which will be asynchronously updated with the latest value of the optional inside the tick, including whether the optional is null or not.
This converts a bounded value inside a tick into an asynchronous value outside the tick that tracks the inner value. This is useful for getting the value as of the “most recent” tick, but note that updates are propagated asynchronously outside the tick.
§Example
input_batch // first tick: [], second tick: [1, 2, 3]
.max()
.latest()
// asynchronously changes from None ~> 3
Sourcepub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded>
pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded>
Synchronously yields this optional outside the tick as an unbounded optional, which will be updated with the latest value of the optional inside the tick.
Unlike Optional::latest
, this preserves synchronous execution, as the output optional
is emitted in an Atomic
context that will process elements synchronously with the input
optional’s Tick
context.
pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded>
pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce>
Sourcepub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce>
pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce>
Trait Implementations§
Source§impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
Source§impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>where
L: Location<'a>,
impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>where
L: Location<'a>,
fn defer_tick(self) -> Self
Auto Trait Implementations§
impl<Type, Loc, Bound> !Freeze for Optional<Type, Loc, Bound>
impl<Type, Loc, Bound> !RefUnwindSafe for Optional<Type, Loc, Bound>
impl<Type, Loc, Bound> !Send for Optional<Type, Loc, Bound>
impl<Type, Loc, Bound> !Sync for Optional<Type, Loc, Bound>
impl<Type, Loc, Bound> Unpin for Optional<Type, Loc, Bound>
impl<Type, Loc, Bound> !UnwindSafe for Optional<Type, Loc, Bound>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more