Struct hydro_lang::stream::Stream
source · pub struct Stream<T, L, B, Order = TotalOrder> { /* private fields */ }
Expand description
An ordered sequence stream of elements of type T
.
Type Parameters:
T
: the type of elements in the streamL
: the location where the stream is being materializedB
: the boundedness of the stream, which is eitherBounded
orUnbounded
Order
: the ordering of the stream, which is eitherTotalOrder
orNoOrder
(default isTotalOrder
)
Implementations§
source§impl<'a, T, L: Location<'a>, B, Order> Stream<T, L, B, Order>
impl<'a, T, L: Location<'a>, B, Order> Stream<T, L, B, Order>
pub fn map<U, F: Fn(T) -> U + 'a>( self, f: impl IntoQuotedMut<'a, F, L>, ) -> Stream<U, L, B, Order>
pub fn cloned(self) -> Stream<T, L, B, Order>where
T: Clone,
pub fn flat_map_ordered<U, I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a>( self, f: impl IntoQuotedMut<'a, F, L>, ) -> Stream<U, L, B, Order>
pub fn flat_map_unordered<U, I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a>( self, f: impl IntoQuotedMut<'a, F, L>, ) -> Stream<U, L, B, NoOrder>
pub fn flatten_ordered<U>(self) -> Stream<U, L, B, Order>where
T: IntoIterator<Item = U>,
pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder>where
T: IntoIterator<Item = U>,
pub fn filter<F: Fn(&T) -> bool + 'a>( self, f: impl IntoQuotedMut<'a, F, L>, ) -> Stream<T, L, B, Order>
pub fn filter_map<U, F: Fn(T) -> Option<U> + 'a>( self, f: impl IntoQuotedMut<'a, F, L>, ) -> Stream<U, L, B, Order>
pub fn cross_singleton<O>(
self,
other: impl Into<Optional<O, L, Bounded>>,
) -> Stream<(T, O), L, B, Order>where
O: Clone,
sourcepub fn continue_if<U>(
self,
signal: Optional<U, L, Bounded>,
) -> Stream<T, L, B, Order>
pub fn continue_if<U>( self, signal: Optional<U, L, Bounded>, ) -> Stream<T, L, B, Order>
Allow this stream through if the other stream has elements, otherwise the output is empty.
sourcepub fn continue_unless<U>(
self,
other: Optional<U, L, Bounded>,
) -> Stream<T, L, B, Order>
pub fn continue_unless<U>( self, other: Optional<U, L, Bounded>, ) -> Stream<T, L, B, Order>
Allow this stream through if the other stream is empty, otherwise the output is empty.
pub fn cross_product<O>( self, other: Stream<O, L, B, Order>, ) -> Stream<(T, O), L, B, Order>
pub fn unique(self) -> Stream<T, L, B, Order>
pub fn filter_not_in<O2>( self, other: Stream<T, L, Bounded, O2>, ) -> Stream<T, L, Bounded, Order>
pub fn inspect<F: Fn(&T) + 'a>( self, f: impl IntoQuotedMut<'a, F, L>, ) -> Stream<T, L, B, Order>
sourcepub unsafe fn assume_ordering<O>(self) -> Stream<T, L, B, O>
pub unsafe fn assume_ordering<O>(self) -> Stream<T, L, B, O>
Explicitly “casts” the stream to a type with a different ordering guarantee. Useful in unsafe code where the ordering cannot be proven by the type-system.
§Safety
This function is used as an escape hatch, and any mistakes in the provided ordering guarantee will propogate into the guarantees for the rest of the program.
source§impl<'a, T, L: Location<'a>, B, Order> Stream<T, L, B, Order>
impl<'a, T, L: Location<'a>, B, Order> Stream<T, L, B, Order>
pub fn fold_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>( self, init: impl IntoQuotedMut<'a, I, L>, comb: impl IntoQuotedMut<'a, F, L>, ) -> Singleton<A, L, B>
pub fn reduce_commutative<F: Fn(&mut T, T) + 'a>( self, comb: impl IntoQuotedMut<'a, F, L>, ) -> Optional<T, L, B>
pub fn max(self) -> Optional<T, L, B>where
T: Ord,
pub fn max_by_key<K: Ord, F: Fn(&T) -> K + 'a>( self, key: impl IntoQuotedMut<'a, F, L> + Copy, ) -> Optional<T, L, B>
pub fn min(self) -> Optional<T, L, B>where
T: Ord,
pub fn count(self) -> Singleton<usize, L, B>
source§impl<'a, T, L: Location<'a>, B> Stream<T, L, B, TotalOrder>
impl<'a, T, L: Location<'a>, B> Stream<T, L, B, TotalOrder>
pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder>
pub fn first(self) -> Optional<T, L, B>
pub fn last(self) -> Optional<T, L, B>
pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>( self, init: impl IntoQuotedMut<'a, I, L>, comb: impl IntoQuotedMut<'a, F, L>, ) -> Singleton<A, L, B>
pub fn reduce<F: Fn(&mut T, T) + 'a>( self, comb: impl IntoQuotedMut<'a, F, L>, ) -> Optional<T, L, B>
source§impl<'a, T, L: Location<'a>> Stream<T, L, Bounded, TotalOrder>
impl<'a, T, L: Location<'a>> Stream<T, L, Bounded, TotalOrder>
pub fn chain( self, other: Stream<T, L, Bounded, TotalOrder>, ) -> Stream<T, L, Bounded, TotalOrder>
source§impl<'a, K: Eq + Hash, V, L: Location<'a>> Stream<(K, V), Tick<L>, Bounded>
impl<'a, K: Eq + Hash, V, L: Location<'a>> Stream<(K, V), Tick<L>, Bounded>
pub fn fold_keyed<A, I: Fn() -> A + 'a, F: Fn(&mut A, V) + 'a>( self, init: impl IntoQuotedMut<'a, I, Tick<L>>, comb: impl IntoQuotedMut<'a, F, Tick<L>>, ) -> Stream<(K, A), Tick<L>, Bounded>
pub fn reduce_keyed<F: Fn(&mut V, V) + 'a>( self, comb: impl IntoQuotedMut<'a, F, Tick<L>>, ) -> Stream<(K, V), Tick<L>, Bounded>
source§impl<'a, K: Eq + Hash, V, L: Location<'a>, Order> Stream<(K, V), Tick<L>, Bounded, Order>
impl<'a, K: Eq + Hash, V, L: Location<'a>, Order> Stream<(K, V), Tick<L>, Bounded, Order>
pub fn fold_keyed_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V) + 'a>( self, init: impl IntoQuotedMut<'a, I, Tick<L>>, comb: impl IntoQuotedMut<'a, F, Tick<L>>, ) -> Stream<(K, A), Tick<L>, Bounded, Order>
pub fn keys(self) -> Stream<K, Tick<L>, Bounded, Order>
pub fn reduce_keyed_commutative<F: Fn(&mut V, V) + 'a>( self, comb: impl IntoQuotedMut<'a, F, Tick<L>>, ) -> Stream<(K, V), Tick<L>, Bounded, Order>
source§impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream<T, Timestamped<L>, B, Order>
impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream<T, Timestamped<L>, B, Order>
sourcepub unsafe fn tick_batch(self) -> Stream<T, Tick<L>, Bounded, Order>
pub unsafe fn tick_batch(self) -> Stream<T, Tick<L>, Bounded, Order>
Given a tick, returns a stream corresponding to a batch of elements for that tick. These batches are guaranteed to be contiguous across ticks and preserve the order of the input.
§Safety
The batch boundaries are non-deterministic and may change across executions.
pub fn drop_timestamp(self) -> Stream<T, L, B, Order>
pub fn timestamp_source(&self) -> Tick<L>
source§impl<'a, T, L: Location<'a> + NoTick + NoTimestamp, B, Order> Stream<T, L, B, Order>
impl<'a, T, L: Location<'a> + NoTick + NoTimestamp, B, Order> Stream<T, L, B, Order>
pub fn timestamped(self, tick: &Tick<L>) -> Stream<T, Timestamped<L>, B, Order>
sourcepub unsafe fn sample_every(
self,
interval: impl QuotedWithContext<'a, Duration, L> + Copy + 'a,
) -> Stream<T, L, Unbounded, Order>
pub unsafe fn sample_every( self, interval: impl QuotedWithContext<'a, Duration, L> + Copy + 'a, ) -> Stream<T, L, Unbounded, Order>
Given a time interval, returns a stream corresponding to samples taken from the stream roughly at that interval. The output will have elements in the same order as the input, but with arbitrary elements skipped between samples. There is also no guarantee on the exact timing of the samples.
§Safety
The output stream is non-deterministic in which elements are sampled, since this is controlled by a clock.
sourcepub unsafe fn timeout(
self,
duration: impl QuotedWithContext<'a, Duration, Tick<L>> + Copy + 'a,
) -> Optional<(), L, Unbounded>
pub unsafe fn timeout( self, duration: impl QuotedWithContext<'a, Duration, Tick<L>> + Copy + 'a, ) -> Optional<(), L, Unbounded>
Given a timeout duration, returns an Optional
which will have a value if the
stream has not emitted a value since that duration.
§Safety
Timeout relies on non-deterministic sampling of the stream, so depending on when
samples take place, timeouts may be non-deterministically generated or missed,
and the notification of the timeout may be delayed as well. There is also no
guarantee on how long the Optional
will have a value after the timeout is
detected based on when the next sample is taken.
source§impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream<T, L, B, Order>
impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream<T, L, B, Order>
pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
pub fn dest_sink<S: Unpin + Sink<T> + 'a>( self, sink: impl QuotedWithContext<'a, S, L>, )
source§impl<'a, T, C1, B, Order> Stream<T, Cluster<'a, C1>, B, Order>
impl<'a, T, C1, B, Order> Stream<T, Cluster<'a, C1>, B, Order>
source§impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream<T, L, B, Order>
impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream<T, L, B, Order>
pub fn decouple_process<P2>( self, other: &Process<'a, P2>, ) -> Stream<T, Process<'a, P2>, Unbounded, Order>
pub fn send_bincode<L2: Location<'a>, CoreType>(
self,
other: &L2,
) -> Stream<<L::Root as CanSend<'a, L2>>::Out<CoreType>, L2, Unbounded, Order::Min>where
L::Root: CanSend<'a, L2, In<CoreType> = T>,
CoreType: Serialize + DeserializeOwned,
Order: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<Order>>,
pub fn send_bincode_external<L2: 'a, CoreType>(
self,
other: &ExternalProcess<'_, L2>,
) -> ExternalBincodeStream<L::Out<CoreType>>where
L: CanSend<'a, ExternalProcess<'a, L2>, In<CoreType> = T, Out<CoreType> = CoreType>,
CoreType: Serialize + DeserializeOwned,
pub fn send_bytes<L2: Location<'a>>( self, other: &L2, ) -> Stream<<L::Root as CanSend<'a, L2>>::Out<Bytes>, L2, Unbounded, Order::Min>
pub fn send_bytes_external<L2: 'a>( self, other: &ExternalProcess<'_, L2>, ) -> ExternalBytesPort
pub fn send_bincode_interleaved<L2: Location<'a>, Tag, CoreType>(
self,
other: &L2,
) -> Stream<CoreType, L2, Unbounded, Order::Min>where
L::Root: CanSend<'a, L2, In<CoreType> = T, Out<CoreType> = (Tag, CoreType)>,
CoreType: Serialize + DeserializeOwned,
Order: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<Order>>,
pub fn send_bytes_interleaved<L2: Location<'a>, Tag>( self, other: &L2, ) -> Stream<Bytes, L2, Unbounded, Order::Min>
pub fn broadcast_bincode<C2: 'a>( self, other: &Cluster<'a, C2>, ) -> Stream<<L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<T>, Cluster<'a, C2>, Unbounded, Order::Min>
pub fn broadcast_bincode_interleaved<C2: 'a, Tag>( self, other: &Cluster<'a, C2>, ) -> Stream<T, Cluster<'a, C2>, Unbounded, Order::Min>
pub fn broadcast_bytes<C2: 'a>( self, other: &Cluster<'a, C2>, ) -> Stream<<L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<Bytes>, Cluster<'a, C2>, Unbounded, Order::Min>
pub fn broadcast_bytes_interleaved<C2: 'a, Tag>( self, other: &Cluster<'a, C2>, ) -> Stream<Bytes, Cluster<'a, C2>, Unbounded, Order::Min>
source§impl<'a, T, L: Location<'a> + NoTick, B> Stream<T, L, B, TotalOrder>
impl<'a, T, L: Location<'a> + NoTick, B> Stream<T, L, B, TotalOrder>
pub fn round_robin_bincode<C2: 'a>(
self,
other: &Cluster<'a, C2>,
) -> Stream<<L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<T>, Cluster<'a, C2>, Unbounded, <TotalOrder as MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>>::Min>where
L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T)>,
T: Clone + Serialize + DeserializeOwned,
TotalOrder: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
pub fn round_robin_bincode_interleaved<C2: 'a, Tag>(
self,
other: &Cluster<'a, C2>,
) -> Stream<T, Cluster<'a, C2>, Unbounded, <TotalOrder as MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>>::Min>where
L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)> + 'a,
T: Clone + Serialize + DeserializeOwned,
TotalOrder: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
pub fn round_robin_bytes<C2: 'a>(
self,
other: &Cluster<'a, C2>,
) -> Stream<<L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<Bytes>, Cluster<'a, C2>, Unbounded, <TotalOrder as MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>>::Min>where
L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T)> + 'a,
T: Clone,
TotalOrder: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
pub fn round_robin_bytes_interleaved<C2: 'a, Tag>(
self,
other: &Cluster<'a, C2>,
) -> Stream<Bytes, Cluster<'a, C2>, Unbounded, <TotalOrder as MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>>::Min>where
L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T), Out<Bytes> = (Tag, Bytes)> + 'a,
T: Clone,
TotalOrder: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
Trait Implementations§
source§impl<'a, T, L: Location<'a> + NoTick, B, Order> CycleCollection<'a, ForwardRefMarker> for Stream<T, L, B, Order>
impl<'a, T, L: Location<'a> + NoTick, B, Order> CycleCollection<'a, ForwardRefMarker> for Stream<T, L, B, Order>
source§impl<'a, T, L: Location<'a>, Order> CycleCollection<'a, TickCycleMarker> for Stream<T, Tick<L>, Bounded, Order>
impl<'a, T, L: Location<'a>, Order> CycleCollection<'a, TickCycleMarker> for Stream<T, Tick<L>, Bounded, Order>
source§impl<'a, T, L: Location<'a> + NoTick, B, Order> CycleComplete<'a, ForwardRefMarker> for Stream<T, L, B, Order>
impl<'a, T, L: Location<'a> + NoTick, B, Order> CycleComplete<'a, ForwardRefMarker> for Stream<T, L, B, Order>
fn complete(self, ident: Ident, expected_location: LocationId)
source§impl<'a, T, L: Location<'a>, Order> CycleComplete<'a, TickCycleMarker> for Stream<T, Tick<L>, Bounded, Order>
impl<'a, T, L: Location<'a>, Order> CycleComplete<'a, TickCycleMarker> for Stream<T, Tick<L>, Bounded, Order>
fn complete(self, ident: Ident, expected_location: LocationId)
Auto Trait Implementations§
impl<T, L, B, Order = TotalOrder> !Freeze for Stream<T, L, B, Order>
impl<T, L, B, Order = TotalOrder> !RefUnwindSafe for Stream<T, L, B, Order>
impl<T, L, B, Order = TotalOrder> !Send for Stream<T, L, B, Order>
impl<T, L, B, Order = TotalOrder> !Sync for Stream<T, L, B, Order>
impl<T, L, B, Order> Unpin for Stream<T, L, B, Order>
impl<T, L, B, Order = TotalOrder> !UnwindSafe for Stream<T, L, B, Order>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
source§unsafe fn clone_to_uninit(&self, dst: *mut T)
unsafe fn clone_to_uninit(&self, dst: *mut T)
clone_to_uninit
)§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self> ⓘ
fn instrument(self, span: Span) -> Instrumented<Self> ⓘ
source§impl<T> IntoEither for T
impl<T> IntoEither for T
source§fn into_either(self, into_left: bool) -> Either<Self, Self> ⓘ
fn into_either(self, into_left: bool) -> Either<Self, Self> ⓘ
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self> ⓘ
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self> ⓘ
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more