Struct hydro_lang::stream::Stream

source ·
pub struct Stream<T, L, B, Order = TotalOrder> { /* private fields */ }
Expand description

An ordered sequence stream of elements of type T.

Type Parameters:

  • T: the type of elements in the stream
  • L: the location where the stream is being materialized
  • B: the boundedness of the stream, which is either Bounded or Unbounded
  • Order: the ordering of the stream, which is either TotalOrder or NoOrder (default is TotalOrder)

Implementations§

source§

impl<'a, T, L: Location<'a>, B, Order> Stream<T, L, B, Order>

source

pub fn map<U, F: Fn(T) -> U + 'a>( self, f: impl IntoQuotedMut<'a, F, L>, ) -> Stream<U, L, B, Order>

source

pub fn cloned(self) -> Stream<T, L, B, Order>
where T: Clone,

source

pub fn flat_map_ordered<U, I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a>( self, f: impl IntoQuotedMut<'a, F, L>, ) -> Stream<U, L, B, Order>

source

pub fn flat_map_unordered<U, I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a>( self, f: impl IntoQuotedMut<'a, F, L>, ) -> Stream<U, L, B, NoOrder>

source

pub fn flatten_ordered<U>(self) -> Stream<U, L, B, Order>
where T: IntoIterator<Item = U>,

source

pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder>
where T: IntoIterator<Item = U>,

source

pub fn filter<F: Fn(&T) -> bool + 'a>( self, f: impl IntoQuotedMut<'a, F, L>, ) -> Stream<T, L, B, Order>

source

pub fn filter_map<U, F: Fn(T) -> Option<U> + 'a>( self, f: impl IntoQuotedMut<'a, F, L>, ) -> Stream<U, L, B, Order>

source

pub fn cross_singleton<O>( self, other: impl Into<Optional<O, L, Bounded>>, ) -> Stream<(T, O), L, B, Order>
where O: Clone,

source

pub fn continue_if<U>( self, signal: Optional<U, L, Bounded>, ) -> Stream<T, L, B, Order>

Allow this stream through if the other stream has elements, otherwise the output is empty.

source

pub fn continue_unless<U>( self, other: Optional<U, L, Bounded>, ) -> Stream<T, L, B, Order>

Allow this stream through if the other stream is empty, otherwise the output is empty.

source

pub fn cross_product<O>( self, other: Stream<O, L, B, Order>, ) -> Stream<(T, O), L, B, Order>
where T: Clone, O: Clone,

source

pub fn unique(self) -> Stream<T, L, B, Order>
where T: Eq + Hash,

source

pub fn filter_not_in<O2>( self, other: Stream<T, L, Bounded, O2>, ) -> Stream<T, L, Bounded, Order>
where T: Eq + Hash,

source

pub fn inspect<F: Fn(&T) + 'a>( self, f: impl IntoQuotedMut<'a, F, L>, ) -> Stream<T, L, B, Order>

source

pub unsafe fn assume_ordering<O>(self) -> Stream<T, L, B, O>

Explicitly “casts” the stream to a type with a different ordering guarantee. Useful in unsafe code where the ordering cannot be proven by the type-system.

§Safety

This function is used as an escape hatch, and any mistakes in the provided ordering guarantee will propogate into the guarantees for the rest of the program.

source§

impl<'a, T, L: Location<'a>, B, Order> Stream<T, L, B, Order>
where Order: MinOrder<NoOrder, Min = NoOrder>,

source

pub fn fold_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>( self, init: impl IntoQuotedMut<'a, I, L>, comb: impl IntoQuotedMut<'a, F, L>, ) -> Singleton<A, L, B>

source

pub fn reduce_commutative<F: Fn(&mut T, T) + 'a>( self, comb: impl IntoQuotedMut<'a, F, L>, ) -> Optional<T, L, B>

source

pub fn max(self) -> Optional<T, L, B>
where T: Ord,

source

pub fn max_by_key<K: Ord, F: Fn(&T) -> K + 'a>( self, key: impl IntoQuotedMut<'a, F, L> + Copy, ) -> Optional<T, L, B>

source

pub fn min(self) -> Optional<T, L, B>
where T: Ord,

source

pub fn count(self) -> Singleton<usize, L, B>

source§

impl<'a, T, L: Location<'a>, B> Stream<T, L, B, TotalOrder>

source

pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder>

source

pub fn first(self) -> Optional<T, L, B>

source

pub fn last(self) -> Optional<T, L, B>

source

pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>( self, init: impl IntoQuotedMut<'a, I, L>, comb: impl IntoQuotedMut<'a, F, L>, ) -> Singleton<A, L, B>

source

pub fn reduce<F: Fn(&mut T, T) + 'a>( self, comb: impl IntoQuotedMut<'a, F, L>, ) -> Optional<T, L, B>

source§

impl<'a, T, L: Location<'a>> Stream<T, L, Bounded, TotalOrder>

source

pub fn chain( self, other: Stream<T, L, Bounded, TotalOrder>, ) -> Stream<T, L, Bounded, TotalOrder>

source§

impl<'a, T, L: Location<'a> + NoTick + NoTimestamp> Stream<T, L, Unbounded, NoOrder>

source

pub fn union( self, other: Stream<T, L, Unbounded, NoOrder>, ) -> Stream<T, L, Unbounded, NoOrder>

source§

impl<'a, T, L: Location<'a>, Order> Stream<T, L, Bounded, Order>

source

pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder>
where T: Ord,

source

pub fn union<B2, O2>( self, other: Stream<T, L, B2, O2>, ) -> Stream<T, L, B2, Order::Min>
where Order: MinOrder<O2>,

source§

impl<'a, K, V1, L: Location<'a>, B, Order> Stream<(K, V1), L, B, Order>

source

pub fn join<V2, O2>( self, n: Stream<(K, V2), L, B, O2>, ) -> Stream<(K, (V1, V2)), L, B, NoOrder>
where K: Eq + Hash,

source

pub fn anti_join<O2>( self, n: Stream<K, L, Bounded, O2>, ) -> Stream<(K, V1), L, B, Order>
where K: Eq + Hash,

source§

impl<'a, K: Eq + Hash, V, L: Location<'a>> Stream<(K, V), Tick<L>, Bounded>

source

pub fn fold_keyed<A, I: Fn() -> A + 'a, F: Fn(&mut A, V) + 'a>( self, init: impl IntoQuotedMut<'a, I, Tick<L>>, comb: impl IntoQuotedMut<'a, F, Tick<L>>, ) -> Stream<(K, A), Tick<L>, Bounded>

source

pub fn reduce_keyed<F: Fn(&mut V, V) + 'a>( self, comb: impl IntoQuotedMut<'a, F, Tick<L>>, ) -> Stream<(K, V), Tick<L>, Bounded>

source§

impl<'a, K: Eq + Hash, V, L: Location<'a>, Order> Stream<(K, V), Tick<L>, Bounded, Order>

source

pub fn fold_keyed_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V) + 'a>( self, init: impl IntoQuotedMut<'a, I, Tick<L>>, comb: impl IntoQuotedMut<'a, F, Tick<L>>, ) -> Stream<(K, A), Tick<L>, Bounded, Order>

source

pub fn keys(self) -> Stream<K, Tick<L>, Bounded, Order>

source

pub fn reduce_keyed_commutative<F: Fn(&mut V, V) + 'a>( self, comb: impl IntoQuotedMut<'a, F, Tick<L>>, ) -> Stream<(K, V), Tick<L>, Bounded, Order>

source§

impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream<T, Timestamped<L>, B, Order>

source

pub unsafe fn tick_batch(self) -> Stream<T, Tick<L>, Bounded, Order>

Given a tick, returns a stream corresponding to a batch of elements for that tick. These batches are guaranteed to be contiguous across ticks and preserve the order of the input.

§Safety

The batch boundaries are non-deterministic and may change across executions.

source

pub fn drop_timestamp(self) -> Stream<T, L, B, Order>

source

pub fn timestamp_source(&self) -> Tick<L>

source§

impl<'a, T, L: Location<'a> + NoTick + NoTimestamp, B, Order> Stream<T, L, B, Order>

source

pub fn timestamped(self, tick: &Tick<L>) -> Stream<T, Timestamped<L>, B, Order>

source

pub unsafe fn sample_every( self, interval: impl QuotedWithContext<'a, Duration, L> + Copy + 'a, ) -> Stream<T, L, Unbounded, Order>

Given a time interval, returns a stream corresponding to samples taken from the stream roughly at that interval. The output will have elements in the same order as the input, but with arbitrary elements skipped between samples. There is also no guarantee on the exact timing of the samples.

§Safety

The output stream is non-deterministic in which elements are sampled, since this is controlled by a clock.

source

pub unsafe fn timeout( self, duration: impl QuotedWithContext<'a, Duration, Tick<L>> + Copy + 'a, ) -> Optional<(), L, Unbounded>
where Order: MinOrder<NoOrder, Min = NoOrder>,

Given a timeout duration, returns an Optional which will have a value if the stream has not emitted a value since that duration.

§Safety

Timeout relies on non-deterministic sampling of the stream, so depending on when samples take place, timeouts may be non-deterministically generated or missed, and the notification of the timeout may be delayed as well. There is also no guarantee on how long the Optional will have a value after the timeout is detected based on when the next sample is taken.

source§

impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream<T, L, B, Order>

source

pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)

source

pub fn dest_sink<S: Unpin + Sink<T> + 'a>( self, sink: impl QuotedWithContext<'a, S, L>, )

source§

impl<'a, T, L: Location<'a>, Order> Stream<T, Tick<L>, Bounded, Order>

source

pub fn all_ticks(self) -> Stream<T, Timestamped<L>, Unbounded, Order>

source

pub fn persist(self) -> Stream<T, Tick<L>, Bounded, Order>
where T: Clone,

source

pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, Order>

source

pub fn delta(self) -> Stream<T, Tick<L>, Bounded, Order>

source§

impl<'a, T, C1, B, Order> Stream<T, Cluster<'a, C1>, B, Order>

source

pub fn decouple_cluster<C2: 'a, Tag>( self, other: &Cluster<'a, C2>, ) -> Stream<T, Cluster<'a, C2>, Unbounded, Order>
where Cluster<'a, C1>: Location<'a, Root = Cluster<'a, C1>> + CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)>, T: Clone + Serialize + DeserializeOwned, Order: MinOrder<<Cluster<'a, C1> as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<Order>>,

source§

impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream<T, L, B, Order>

source

pub fn decouple_process<P2>( self, other: &Process<'a, P2>, ) -> Stream<T, Process<'a, P2>, Unbounded, Order>
where L::Root: CanSend<'a, Process<'a, P2>, In<T> = T, Out<T> = T>, T: Clone + Serialize + DeserializeOwned, Order: MinOrder<<L::Root as CanSend<'a, Process<'a, P2>>>::OutStrongestOrder<Order>, Min = Order>,

source

pub fn send_bincode<L2: Location<'a>, CoreType>( self, other: &L2, ) -> Stream<<L::Root as CanSend<'a, L2>>::Out<CoreType>, L2, Unbounded, Order::Min>
where L::Root: CanSend<'a, L2, In<CoreType> = T>, CoreType: Serialize + DeserializeOwned, Order: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<Order>>,

source

pub fn send_bincode_external<L2: 'a, CoreType>( self, other: &ExternalProcess<'_, L2>, ) -> ExternalBincodeStream<L::Out<CoreType>>
where L: CanSend<'a, ExternalProcess<'a, L2>, In<CoreType> = T, Out<CoreType> = CoreType>, CoreType: Serialize + DeserializeOwned,

source

pub fn send_bytes<L2: Location<'a>>( self, other: &L2, ) -> Stream<<L::Root as CanSend<'a, L2>>::Out<Bytes>, L2, Unbounded, Order::Min>
where L::Root: CanSend<'a, L2, In<Bytes> = T>, Order: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<Order>>,

source

pub fn send_bytes_external<L2: 'a>( self, other: &ExternalProcess<'_, L2>, ) -> ExternalBytesPort
where L::Root: CanSend<'a, ExternalProcess<'a, L2>, In<Bytes> = T, Out<Bytes> = Bytes>,

source

pub fn send_bincode_interleaved<L2: Location<'a>, Tag, CoreType>( self, other: &L2, ) -> Stream<CoreType, L2, Unbounded, Order::Min>
where L::Root: CanSend<'a, L2, In<CoreType> = T, Out<CoreType> = (Tag, CoreType)>, CoreType: Serialize + DeserializeOwned, Order: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<Order>>,

source

pub fn send_bytes_interleaved<L2: Location<'a>, Tag>( self, other: &L2, ) -> Stream<Bytes, L2, Unbounded, Order::Min>
where L::Root: CanSend<'a, L2, In<Bytes> = T, Out<Bytes> = (Tag, Bytes)>, Order: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<Order>>,

source

pub fn broadcast_bincode<C2: 'a>( self, other: &Cluster<'a, C2>, ) -> Stream<<L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<T>, Cluster<'a, C2>, Unbounded, Order::Min>
where L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T)>, T: Clone + Serialize + DeserializeOwned, Order: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<Order>>,

source

pub fn broadcast_bincode_interleaved<C2: 'a, Tag>( self, other: &Cluster<'a, C2>, ) -> Stream<T, Cluster<'a, C2>, Unbounded, Order::Min>
where L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)> + 'a, T: Clone + Serialize + DeserializeOwned, Order: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<Order>>,

source

pub fn broadcast_bytes<C2: 'a>( self, other: &Cluster<'a, C2>, ) -> Stream<<L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<Bytes>, Cluster<'a, C2>, Unbounded, Order::Min>
where L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T)> + 'a, T: Clone, Order: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<Order>>,

source

pub fn broadcast_bytes_interleaved<C2: 'a, Tag>( self, other: &Cluster<'a, C2>, ) -> Stream<Bytes, Cluster<'a, C2>, Unbounded, Order::Min>
where L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T), Out<Bytes> = (Tag, Bytes)> + 'a, T: Clone, Order: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<Order>>,

source§

impl<'a, T, L: Location<'a> + NoTick, B> Stream<T, L, B, TotalOrder>

source

pub fn round_robin_bincode<C2: 'a>( self, other: &Cluster<'a, C2>, ) -> Stream<<L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<T>, Cluster<'a, C2>, Unbounded, <TotalOrder as MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>>::Min>
where L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T)>, T: Clone + Serialize + DeserializeOwned, TotalOrder: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,

source

pub fn round_robin_bincode_interleaved<C2: 'a, Tag>( self, other: &Cluster<'a, C2>, ) -> Stream<T, Cluster<'a, C2>, Unbounded, <TotalOrder as MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>>::Min>
where L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)> + 'a, T: Clone + Serialize + DeserializeOwned, TotalOrder: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,

source

pub fn round_robin_bytes<C2: 'a>( self, other: &Cluster<'a, C2>, ) -> Stream<<L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<Bytes>, Cluster<'a, C2>, Unbounded, <TotalOrder as MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>>::Min>
where L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T)> + 'a, T: Clone, TotalOrder: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,

source

pub fn round_robin_bytes_interleaved<C2: 'a, Tag>( self, other: &Cluster<'a, C2>, ) -> Stream<Bytes, Cluster<'a, C2>, Unbounded, <TotalOrder as MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>>::Min>
where L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T), Out<Bytes> = (Tag, Bytes)> + 'a, T: Clone, TotalOrder: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,

Trait Implementations§

source§

impl<'a, T: Clone, L: Location<'a>, B, Order> Clone for Stream<T, L, B, Order>

source§

fn clone(&self) -> Self

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
source§

impl<'a, T, L: Location<'a> + NoTick, B, Order> CycleCollection<'a, ForwardRefMarker> for Stream<T, L, B, Order>

source§

type Location = L

source§

fn create_source(ident: Ident, location: L) -> Self

source§

impl<'a, T, L: Location<'a>, Order> CycleCollection<'a, TickCycleMarker> for Stream<T, Tick<L>, Bounded, Order>

source§

type Location = Tick<L>

source§

fn create_source(ident: Ident, location: Tick<L>) -> Self

source§

impl<'a, T, L: Location<'a> + NoTick, B, Order> CycleComplete<'a, ForwardRefMarker> for Stream<T, L, B, Order>

source§

fn complete(self, ident: Ident, expected_location: LocationId)

source§

impl<'a, T, L: Location<'a>, Order> CycleComplete<'a, TickCycleMarker> for Stream<T, Tick<L>, Bounded, Order>

source§

fn complete(self, ident: Ident, expected_location: LocationId)

source§

impl<'a, T, L: Location<'a>, Order> DeferTick for Stream<T, Tick<L>, Bounded, Order>

source§

fn defer_tick(self) -> Self

source§

impl<'a, T, L: Location<'a>, B> From<Stream<T, L, B>> for Stream<T, L, B, NoOrder>

source§

fn from(stream: Stream<T, L, B, TotalOrder>) -> Stream<T, L, B, NoOrder>

Converts to this type from the input type.

Auto Trait Implementations§

§

impl<T, L, B, Order = TotalOrder> !Freeze for Stream<T, L, B, Order>

§

impl<T, L, B, Order = TotalOrder> !RefUnwindSafe for Stream<T, L, B, Order>

§

impl<T, L, B, Order = TotalOrder> !Send for Stream<T, L, B, Order>

§

impl<T, L, B, Order = TotalOrder> !Sync for Stream<T, L, B, Order>

§

impl<T, L, B, Order> Unpin for Stream<T, L, B, Order>
where L: Unpin, T: Unpin, B: Unpin, Order: Unpin,

§

impl<T, L, B, Order = TotalOrder> !UnwindSafe for Stream<T, L, B, Order>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> CloneToUninit for T
where T: Clone,

source§

unsafe fn clone_to_uninit(&self, dst: *mut T)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dst. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoEither for T

source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
source§

impl<T> MinOrder<T> for T

source§

type Min = T

The weaker of the two orderings.
source§

impl<T> ToOwned for T
where T: Clone,

source§

type Owned = T

The resulting type after obtaining ownership.
source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

source§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more