Singleton

Struct Singleton 

Source
pub struct Singleton<Type, Loc, Bound: Boundedness> { /* private fields */ }
Expand description

A single Rust value that can asynchronously change over time.

If the singleton is Bounded, the value is frozen and will not change. But if it is Unbounded, the value will asynchronously change over time.

Singletons are often used to capture state in a Hydro program, such as an event counter which is a single number that will asynchronously change as events are processed. Singletons also appear when dealing with bounded collections, to perform regular Rust computations on concrete values, such as getting the length of a batch of requests.

Type Parameters:

  • Type: the type of the value in this singleton
  • Loc: the Location where the singleton is materialized
  • Bound: tracks whether the value is Bounded (fixed) or Unbounded (changing asynchronously)

Implementations§

Source§

impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
where L: Location<'a>,

Source

pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
where F: Fn(T) -> U + 'a,

Transforms the singleton value by applying a function f to it, continuously as the input is updated.

§Example
let tick = process.tick();
let singleton = tick.singleton(q!(5));
singleton.map(q!(|v| v * 2)).all_ticks()
// 10
Source

pub fn flat_map_ordered<U, I, F>( self, f: impl IntoQuotedMut<'a, F, L>, ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
where I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a,

Transforms the singleton value by applying a function f to it and then flattening the result into a stream, preserving the order of elements.

The function f is applied to the singleton value to produce an iterator, and all items from that iterator are emitted in the output stream in deterministic order.

The implementation of Iterator for the output type I must produce items in a deterministic order. For example, I could be a Vec, but not a HashSet. If the order is not deterministic, use Singleton::flat_map_unordered instead.

§Example
let tick = process.tick();
let singleton = tick.singleton(q!(vec![1, 2, 3]));
singleton.flat_map_ordered(q!(|v| v)).all_ticks()
// 1, 2, 3
Source

pub fn flat_map_unordered<U, I, F>( self, f: impl IntoQuotedMut<'a, F, L>, ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
where I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a,

Like Singleton::flat_map_ordered, but allows the implementation of Iterator for the output type I to produce items in any order.

The function f is applied to the singleton value to produce an iterator, and all items from that iterator are emitted in the output stream in non-deterministic order.

§Example
let tick = process.tick();
let singleton = tick.singleton(q!(
    std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
));
singleton.flat_map_unordered(q!(|v| v)).all_ticks()
// 1, 2, 3, but in no particular order
Source

pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
where T: IntoIterator<Item = U>,

Flattens the singleton value into a stream, preserving the order of elements.

The singleton value must implement IntoIterator, and all items from that iterator are emitted in the output stream in deterministic order.

The implementation of Iterator for the element type T must produce items in a deterministic order. For example, T could be a Vec, but not a HashSet. If the order is not deterministic, use Singleton::flatten_unordered instead.

§Example
let tick = process.tick();
let singleton = tick.singleton(q!(vec![1, 2, 3]));
singleton.flatten_ordered().all_ticks()
// 1, 2, 3
Source

pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
where T: IntoIterator<Item = U>,

Like Singleton::flatten_ordered, but allows the implementation of Iterator for the element type T to produce items in any order.

The singleton value must implement IntoIterator, and all items from that iterator are emitted in the output stream in non-deterministic order.

§Example
let tick = process.tick();
let singleton = tick.singleton(q!(
    std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
));
singleton.flatten_unordered().all_ticks()
// 1, 2, 3, but in no particular order
Source

pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
where F: Fn(&T) -> bool + 'a,

Creates an optional containing the singleton value if it satisfies a predicate f.

If the predicate returns true, the output optional contains the same value. If the predicate returns false, the output optional is empty.

The closure f receives a reference &T rather than an owned value T because filtering does not modify or take ownership of the value. If you need to modify the value while filtering use Singleton::filter_map instead.

§Example
let tick = process.tick();
let singleton = tick.singleton(q!(5));
singleton.filter(q!(|&x| x > 3)).all_ticks()
// 5
Source

pub fn filter_map<U, F>( self, f: impl IntoQuotedMut<'a, F, L>, ) -> Optional<U, L, B>
where F: Fn(T) -> Option<U> + 'a,

An operator that both filters and maps. It yields the value only if the supplied closure f returns Some(value).

If the closure returns Some(new_value), the output optional contains new_value. If the closure returns None, the output optional is empty.

§Example
let tick = process.tick();
let singleton = tick.singleton(q!("42"));
singleton
    .filter_map(q!(|s| s.parse::<i32>().ok()))
    .all_ticks()
// 42
Source

pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
where Self: ZipResult<'a, O, Location = L>,

Combines this singleton with another Singleton or Optional by tupling their values.

If the other value is a Singleton, the output will be a Singleton, but if it is an Optional, the output will be an Optional that is non-null only if the argument is non-null. This is useful for combining several pieces of state together.

§Example
let tick = process.tick();
let numbers = process
  .source_iter(q!(vec![123, 456]))
  .batch(&tick, nondet!(/** test */));
let count = numbers.clone().count(); // Singleton
let max = numbers.max(); // Optional
count.zip(max).all_ticks()
// [(2, 456)]
Source

pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>

Filters this singleton into an Optional, passing through the singleton value if the argument (a Bounded Optional`) is non-null, otherwise the output is null.

Useful for conditionally processing, such as only emitting a singleton’s value outside a tick if some other condition is satisfied.

§Example
let tick = process.tick();
// ticks are lazy by default, forces the second tick to run
tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));

let batch_first_tick = process
  .source_iter(q!(vec![1]))
  .batch(&tick, nondet!(/** test */));
let batch_second_tick = process
  .source_iter(q!(vec![1, 2, 3]))
  .batch(&tick, nondet!(/** test */))
  .defer_tick(); // appears on the second tick
let some_on_first_tick = tick.optional_first_tick(q!(()));
batch_first_tick.chain(batch_second_tick).count()
  .filter_if_some(some_on_first_tick)
  .all_ticks()
// [1]
Source

pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>

Filters this singleton into an Optional, passing through the singleton value if the argument (a Bounded Optional`) is null, otherwise the output is null.

Like Singleton::filter_if_some, this is useful for conditional processing, but inverts the condition.

§Example
let tick = process.tick();
// ticks are lazy by default, forces the second tick to run
tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));

let batch_first_tick = process
  .source_iter(q!(vec![1]))
  .batch(&tick, nondet!(/** test */));
let batch_second_tick = process
  .source_iter(q!(vec![1, 2, 3]))
  .batch(&tick, nondet!(/** test */))
  .defer_tick(); // appears on the second tick
let some_on_first_tick = tick.optional_first_tick(q!(()));
batch_first_tick.chain(batch_second_tick).count()
  .filter_if_none(some_on_first_tick)
  .all_ticks()
// [3]
Source

pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B>

An operator which allows you to “name” a HydroNode. This is only used for testing, to correlate certain HydroNodes with IDs.

Source§

impl<'a, T, L, B: Boundedness> Singleton<T, Atomic<L>, B>
where L: Location<'a> + NoTick,

Source

pub fn snapshot(self, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded>

Returns a singleton value corresponding to the latest snapshot of the singleton being atomically processed. The snapshot at tick t + 1 is guaranteed to include at least all relevant data that contributed to the snapshot at tick t. Furthermore, all snapshots of this singleton into the atomic-associated tick will observe the same value each tick.

§Non-Determinism

Because this picks a snapshot of a singleton whose value is continuously changing, the output singleton has a non-deterministic value since the snapshot can be at an arbitrary point in time.

Source

pub fn end_atomic(self) -> Optional<T, L, B>

Returns this singleton back into a top-level, asynchronous execution context where updates to the value will be asynchronously propagated.

Source§

impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
where L: Location<'a> + NoTick + NoAtomic,

Source

pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B>

Shifts this singleton into an atomic context, which guarantees that any downstream logic will observe the same version of the value and will be executed synchronously before any outputs are yielded (in Optional::end_atomic).

This is useful to enforce local consistency constraints, such as ensuring that several readers see a consistent version of local state (since otherwise each Singleton::snapshot may pick a different version).

Entering an atomic section requires a Tick argument that declares where the singleton will be atomically processed. Snapshotting an singleton into the same Tick will preserve the synchronous execution, and all such snapshots in the same Tick will have the same value.

Source

pub fn snapshot( self, tick: &Tick<L>, nondet: NonDet, ) -> Singleton<T, Tick<L>, Bounded>
where L: NoTick,

Given a tick, returns a singleton value corresponding to a snapshot of the singleton as of that tick. The snapshot at tick t + 1 is guaranteed to include at least all relevant data that contributed to the snapshot at tick t.

§Non-Determinism

Because this picks a snapshot of a singleton whose value is continuously changing, the output singleton has a non-deterministic value since the snapshot can be at an arbitrary point in time.

Source

pub fn sample_eager( self, nondet: NonDet, ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>

Eagerly samples the singleton as fast as possible, returning a stream of snapshots with order corresponding to increasing prefixes of data contributing to the singleton.

§Non-Determinism

At runtime, the singleton will be arbitrarily sampled as fast as possible, but due to non-deterministic batching and arrival of inputs, the output stream is non-deterministic.

Source

pub fn sample_every( self, interval: impl QuotedWithContext<'a, Duration, L> + Copy + 'a, nondet: NonDet, ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>

Given a time interval, returns a stream corresponding to snapshots of the singleton value taken at various points in time. Because the input singleton may be Unbounded, there are no guarantees on what these snapshots are other than they represent the value of the singleton given some prefix of the streams leading up to it.

§Non-Determinism

The output stream is non-deterministic in which elements are sampled, since this is controlled by a clock.

Source§

impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
where L: Location<'a>,

Source

pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce>

Asynchronously yields the value of this singleton outside the tick as an unbounded stream, which will stream the value computed in each tick as a separate stream element.

Unlike Singleton::latest, the value computed in each tick is emitted separately, producing one element in the output for each tick. This is useful for batched computations, where the results from each tick must be combined together.

§Example
let tick = process.tick();
input_batch // first tick: [1], second tick: [1, 2, 3]
    .count()
    .all_ticks()
// [1, 3]
Source

pub fn all_ticks_atomic( self, ) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce>

Synchronously yields the value of this singleton outside the tick as an unbounded stream, which will stream the value computed in each tick as a separate stream element.

Unlike Singleton::all_ticks, this preserves synchronous execution, as the output stream is emitted in an Atomic context that will process elements synchronously with the input singleton’s Tick context.

Source

pub fn latest(self) -> Singleton<T, L, Unbounded>

Asynchronously yields this singleton outside the tick as an unbounded singleton, which will be asynchronously updated with the latest value of the singleton inside the tick.

This converts a bounded value inside a tick into an asynchronous value outside the tick that tracks the inner value. This is useful for getting the value as of the “most recent” tick, but note that updates are propagated asynchronously outside the tick.

§Example
let tick = process.tick();
input_batch // first tick: [1], second tick: [1, 2, 3]
    .count()
    .latest()
// asynchronously changes from 1 ~> 3
Source

pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded>

Synchronously yields this singleton outside the tick as an unbounded singleton, which will be updated with the latest value of the singleton inside the tick.

Unlike Singleton::latest, this preserves synchronous execution, as the output singleton is emitted in an Atomic context that will process elements synchronously with the input singleton’s Tick context.

Source

pub fn defer_tick(self) -> Singleton<T, Tick<L>, Bounded>

Source

pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce>

👎Deprecated: use .into_stream().persist()
Source

pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce>

Converts this singleton into a Stream containing a single element, the value.

§Example
let tick = process.tick();
let batch_input = process
  .source_iter(q!(vec![123, 456]))
  .batch(&tick, nondet!(/** test */));
batch_input.clone().chain(
  batch_input.count().into_stream()
).all_ticks()
// [123, 456, 2]

Trait Implementations§

Source§

impl<'a, T, L, B: Boundedness> Clone for Singleton<T, L, B>
where T: Clone, L: Location<'a>,

Source§

fn clone(&self) -> Self

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<'a, T, L> DeferTick for Singleton<T, Tick<L>, Bounded>
where L: Location<'a>,

Source§

fn defer_tick(self) -> Self

Source§

impl<'a, T, L, B: Boundedness> From<Singleton<T, L, B>> for Optional<T, L, B>
where L: Location<'a>,

Source§

fn from(singleton: Singleton<T, L, B>) -> Self

Converts to this type from the input type.
Source§

impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
where L: Location<'a>,

Source§

fn from(singleton: Singleton<T, L, Bounded>) -> Self

Converts to this type from the input type.

Auto Trait Implementations§

§

impl<Type, Loc, Bound> !Freeze for Singleton<Type, Loc, Bound>

§

impl<Type, Loc, Bound> !RefUnwindSafe for Singleton<Type, Loc, Bound>

§

impl<Type, Loc, Bound> !Send for Singleton<Type, Loc, Bound>

§

impl<Type, Loc, Bound> !Sync for Singleton<Type, Loc, Bound>

§

impl<Type, Loc, Bound> Unpin for Singleton<Type, Loc, Bound>
where Loc: Unpin, Type: Unpin, Bound: Unpin,

§

impl<Type, Loc, Bound> !UnwindSafe for Singleton<Type, Loc, Bound>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> DynClone for T
where T: Clone,

Source§

fn __clone_box(&self, _: Private) -> *mut ()

Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
§

impl<T> PolicyExt for T
where T: ?Sized,

§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] only if self and other return Action::Follow. Read more
§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

impl<T> ErasedDestructor for T
where T: 'static,