pub struct Stream<Type, Loc, Bound: Boundedness, Order: Ordering = TotalOrder, Retry: Retries = ExactlyOnce> { /* private fields */ }
Expand description
Streaming sequence of elements with type Type
.
This live collection represents a growing sequence of elements, with new elements being asynchronously appended to the end of the sequence. This can be used to model the arrival of network input, such as API requests, or streaming ingestion.
By default, all streams have deterministic ordering and each element is materialized exactly
once. But streams can also capture non-determinism via the Order
and Retries
type
parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
on the stream. For example, if the stream is unordered, you cannot invoke Stream::first
.
Type Parameters:
Type
: the type of elements in the streamLoc
: the location where the stream is being materializedBound
: the boundedness of the stream, which is eitherBounded
orUnbounded
Order
: the ordering of the stream, which is eitherTotalOrder
orNoOrder
(default isTotalOrder
)Retries
: the retry guarantee of the stream, which is eitherExactlyOnce
orAtLeastOnce
(default isExactlyOnce
)
Implementations§
Source§impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R>
impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R>
Sourcepub fn send_bincode<L2>(
self,
other: &Process<'a, L2>,
) -> Stream<T, Process<'a, L2>, Unbounded, O, R>where
T: Serialize + DeserializeOwned,
pub fn send_bincode<L2>(
self,
other: &Process<'a, L2>,
) -> Stream<T, Process<'a, L2>, Unbounded, O, R>where
T: Serialize + DeserializeOwned,
“Moves” elements of this stream to a new distributed location by sending them over the network,
using bincode
to serialize/deserialize messages.
The returned stream captures the elements received at the destination, where values will
asynchronously arrive over the network. Sending from a Process
to another Process
preserves ordering and retries guarantees by using a single TCP channel to send the values. The
recipient is guaranteed to receive a prefix or the sent messages; if the TCP connection is
dropped no further messages will be sent.
§Example
let p1 = flow.process::<()>();
let numbers: Stream<_, Process<_>, Unbounded> = p1.source_iter(q!(vec![1, 2, 3]));
let p2 = flow.process::<()>();
let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
// 1, 2, 3
Sourcepub fn broadcast_bincode<L2: 'a>(
self,
other: &Cluster<'a, L2>,
nondet_membership: NonDet,
) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
pub fn broadcast_bincode<L2: 'a>( self, other: &Cluster<'a, L2>, nondet_membership: NonDet, ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
Broadcasts elements of this stream to all members of a cluster by sending them over the network,
using bincode
to serialize/deserialize messages.
Each element in the stream will be sent to every member of the cluster based on the latest
membership information. This is a common pattern in distributed systems for broadcasting data to
all nodes in a cluster. Unlike Stream::demux_bincode
, which requires (MemberId, T)
tuples to
target specific members, broadcast_bincode
takes a stream of only data elements and sends
each element to all cluster members.
§Non-Determinism
The set of cluster members may asynchronously change over time. Each element is only broadcast to the current cluster members at that point in time. Depending on when we are notified of membership changes, we will broadcast each element to different members.
§Example
let p1 = flow.process::<()>();
let workers: Cluster<()> = flow.cluster::<()>();
let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
// if there are 4 members in the cluster, each receives one element
// - MemberId::<()>(0): [123]
// - MemberId::<()>(1): [123]
// - MemberId::<()>(2): [123]
// - MemberId::<()>(3): [123]
Sourcepub fn send_bincode_external<L2>(
self,
other: &External<'_, L2>,
) -> ExternalBincodeStream<T>where
T: Serialize + DeserializeOwned,
pub fn send_bincode_external<L2>(
self,
other: &External<'_, L2>,
) -> ExternalBincodeStream<T>where
T: Serialize + DeserializeOwned,
Sends the elements of this stream to an external (non-Hydro) process, using bincode
serialization. The external process can receive these elements by establishing a TCP
connection and decoding using [tokio_util::codec::LengthDelimitedCodec
].
§Example
let flow = FlowBuilder::new();
let process = flow.process::<()>();
let numbers: Stream<_, Process<_>, Unbounded> = process.source_iter(q!(vec![1, 2, 3]));
let external = flow.external::<()>();
let external_handle = numbers.send_bincode_external(&external);
let mut deployment = hydro_deploy::Deployment::new();
let nodes = flow
.with_process(&process, deployment.Localhost())
.with_external(&external, deployment.Localhost())
.deploy(&mut deployment);
deployment.deploy().await.unwrap();
// establish the TCP connection
let mut external_recv_stream = nodes.connect_source_bincode(external_handle).await;
deployment.start().await.unwrap();
for w in 1..=3 {
assert_eq!(external_recv_stream.next().await, Some(w));
}
Source§impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries> Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries> Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
Sourcepub fn demux_bincode(
self,
other: &Cluster<'a, L2>,
) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>where
T: Serialize + DeserializeOwned,
pub fn demux_bincode(
self,
other: &Cluster<'a, L2>,
) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>where
T: Serialize + DeserializeOwned,
Sends elements of this stream to specific members of a cluster, identified by a MemberId
,
using bincode
to serialize/deserialize messages.
Each element in the stream must be a tuple (MemberId<L2>, T)
where the first element
specifies which cluster member should receive the data. Unlike Stream::broadcast_bincode
,
this API allows precise targeting of specific cluster members rather than broadcasting to
all members.
§Example
let p1 = flow.process::<()>();
let workers: Cluster<()> = flow.cluster::<()>();
let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
let on_worker: Stream<_, Cluster<_>, _> = numbers
.map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)))
.demux_bincode(&workers);
// if there are 4 members in the cluster, each receives one element
// - MemberId::<()>(0): [0]
// - MemberId::<()>(1): [1]
// - MemberId::<()>(2): [2]
// - MemberId::<()>(3): [3]
Source§impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce>
impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce>
Sourcepub fn round_robin_bincode<L2: 'a>(
self,
other: &Cluster<'a, L2>,
nondet_membership: NonDet,
) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>where
T: Serialize + DeserializeOwned,
pub fn round_robin_bincode<L2: 'a>(
self,
other: &Cluster<'a, L2>,
nondet_membership: NonDet,
) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>where
T: Serialize + DeserializeOwned,
Distributes elements of this stream to cluster members in a round-robin fashion, using
bincode
to serialize/deserialize messages.
This provides load balancing by evenly distributing work across cluster members. The distribution is deterministic based on element order - the first element goes to member 0, the second to member 1, and so on, wrapping around when reaching the end of the member list.
§Non-Determinism
The set of cluster members may asynchronously change over time. Each element is distributed based on the current cluster membership at that point in time. Depending on when cluster members join and leave, the round-robin pattern will change. Furthermore, even when the membership is stable, the order of members in the round-robin pattern may change across runs.
§Ordering Requirements
This method is only available on streams with TotalOrder
and ExactlyOnce
, since the
order of messages and retries affects the round-robin pattern.
§Example
let p1 = flow.process::<()>();
let workers: Cluster<()> = flow.cluster::<()>();
let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
on_worker.send_bincode(&p2)
// with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
// - MemberId::<()>(?): [1]
// - MemberId::<()>(?): [2]
// - MemberId::<()>(?): [3]
// - MemberId::<()>(?): [4]
Source§impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R>
impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R>
Sourcepub fn send_bincode<L2>(
self,
other: &Process<'a, L2>,
) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>where
T: Serialize + DeserializeOwned,
pub fn send_bincode<L2>(
self,
other: &Process<'a, L2>,
) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>where
T: Serialize + DeserializeOwned,
“Moves” elements of this stream from a cluster to a process by sending them over the network,
using bincode
to serialize/deserialize messages.
Each cluster member sends its local stream elements, and they are collected at the destination
as a KeyedStream
where keys identify the source cluster member.
§Example
let workers: Cluster<()> = flow.cluster::<()>();
let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
// if there are 4 members in the cluster, we should receive 4 elements
// { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
If you don’t need to know the source for each element, you can use .values()
to get just the data:
let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
// if there are 4 members in the cluster, we should receive 4 elements
// 1, 1, 1, 1
Sourcepub fn broadcast_bincode<L2: 'a>(
self,
other: &Cluster<'a, L2>,
nondet_membership: NonDet,
) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
pub fn broadcast_bincode<L2: 'a>( self, other: &Cluster<'a, L2>, nondet_membership: NonDet, ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
Broadcasts elements of this stream at each source member to all members of a destination
cluster, using bincode
to serialize/deserialize messages.
Each source member sends each of its stream elements to every member of the cluster
based on its latest membership information. Unlike Stream::demux_bincode
, which requires
(MemberId, T)
tuples to target specific members, broadcast_bincode
takes a stream of
only data elements and sends each element to all cluster members.
§Non-Determinism
The set of cluster members may asynchronously change over time. Each element is only broadcast to the current cluster members known at that point in time at the source member. Depending on when each source member is notified of membership changes, it will broadcast each element to different members.
§Example
let source: Cluster<Source> = flow.cluster::<Source>();
let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
let destination: Cluster<Destination> = flow.cluster::<Destination>();
let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
// if there are 4 members in the desination, each receives one element from each source member
// - Destination(0): { Source(0): [123], Source(1): [123], ... }
// - Destination(1): { Source(0): [123], Source(1): [123], ... }
// - ...
Source§impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries> Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries> Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
Sourcepub fn demux_bincode(
self,
other: &Cluster<'a, L2>,
) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>where
T: Serialize + DeserializeOwned,
pub fn demux_bincode(
self,
other: &Cluster<'a, L2>,
) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>where
T: Serialize + DeserializeOwned,
Sends elements of this stream at each source member to specific members of a destination
cluster, identified by a MemberId
, using bincode
to serialize/deserialize messages.
Each element in the stream must be a tuple (MemberId<L2>, T)
where the first element
specifies which cluster member should receive the data. Unlike Stream::broadcast_bincode
,
this API allows precise targeting of specific cluster members rather than broadcasting to
all members.
Each cluster member sends its local stream elements, and they are collected at each
destination member as a KeyedStream
where keys identify the source cluster member.
§Example
let source: Cluster<Source> = flow.cluster::<Source>();
let to_send: Stream<_, Cluster<_>, _> = source
.source_iter(q!(vec![0, 1, 2, 3]))
.map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)));
let destination: Cluster<Destination> = flow.cluster::<Destination>();
let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
// if there are 4 members in the destination cluster, each receives one message from each source member
// - Destination(0): { Source(0): [0], Source(1): [0], ... }
// - Destination(1): { Source(0): [1], Source(1): [1], ... }
// - ...
Source§impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>where
L: Location<'a>,
impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>where
L: Location<'a>,
Sourcepub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>where
F: Fn(T) -> U + 'a,
pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>where
F: Fn(T) -> U + 'a,
Produces a stream based on invoking f
on each element.
If you do not want to modify the stream and instead only want to view
each item use Stream::inspect
instead.
§Example
let words = process.source_iter(q!(vec!["hello", "world"]));
words.map(q!(|x| x.to_uppercase()))
Sourcepub fn flat_map_ordered<U, I, F>(
self,
f: impl IntoQuotedMut<'a, F, L>,
) -> Stream<U, L, B, O, R>where
I: IntoIterator<Item = U>,
F: Fn(T) -> I + 'a,
pub fn flat_map_ordered<U, I, F>(
self,
f: impl IntoQuotedMut<'a, F, L>,
) -> Stream<U, L, B, O, R>where
I: IntoIterator<Item = U>,
F: Fn(T) -> I + 'a,
For each item i
in the input stream, transform i
using f
and then treat the
result as an Iterator
to produce items one by one. The implementation for Iterator
for the output type U
must produce items in a deterministic order.
For example, U
could be a Vec
, but not a HashSet
. If the order of the items in U
is
not deterministic, use Stream::flat_map_unordered
instead.
§Example
process
.source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
.flat_map_ordered(q!(|x| x))
// 1, 2, 3, 4
Sourcepub fn flat_map_unordered<U, I, F>(
self,
f: impl IntoQuotedMut<'a, F, L>,
) -> Stream<U, L, B, NoOrder, R>where
I: IntoIterator<Item = U>,
F: Fn(T) -> I + 'a,
pub fn flat_map_unordered<U, I, F>(
self,
f: impl IntoQuotedMut<'a, F, L>,
) -> Stream<U, L, B, NoOrder, R>where
I: IntoIterator<Item = U>,
F: Fn(T) -> I + 'a,
Like Stream::flat_map_ordered
, but allows the implementation of Iterator
for the output type U
to produce items in any order.
§Example
process
.source_iter(q!(vec![
std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
std::collections::HashSet::from_iter(vec![3, 4]),
]))
.flat_map_unordered(q!(|x| x))
// 1, 2, 3, 4, but in no particular order
Sourcepub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>where
T: IntoIterator<Item = U>,
pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>where
T: IntoIterator<Item = U>,
For each item i
in the input stream, treat i
as an Iterator
and produce its items one by one.
The implementation for Iterator
for the element type T
must produce items in a deterministic order.
For example, T
could be a Vec
, but not a HashSet
. If the order of the items in T
is
not deterministic, use Stream::flatten_unordered
instead.
process
.source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
.flatten_ordered()
// 1, 2, 3, 4
Sourcepub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>where
T: IntoIterator<Item = U>,
pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>where
T: IntoIterator<Item = U>,
Like Stream::flatten_ordered
, but allows the implementation of Iterator
for the element type T
to produce items in any order.
§Example
process
.source_iter(q!(vec![
std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
std::collections::HashSet::from_iter(vec![3, 4]),
]))
.flatten_unordered()
// 1, 2, 3, 4, but in no particular order
Sourcepub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<T, L, B, O, R>
pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<T, L, B, O, R>
Creates a stream containing only the elements of the input stream that satisfy a predicate
f
, preserving the order of the elements.
The closure f
receives a reference &T
rather than an owned value T
because filtering does
not modify or take ownership of the values. If you need to modify the values while filtering
use Stream::filter_map
instead.
§Example
process
.source_iter(q!(vec![1, 2, 3, 4]))
.filter(q!(|&x| x > 2))
// 3, 4
Sourcepub fn filter_map<U, F>(
self,
f: impl IntoQuotedMut<'a, F, L>,
) -> Stream<U, L, B, O, R>
pub fn filter_map<U, F>( self, f: impl IntoQuotedMut<'a, F, L>, ) -> Stream<U, L, B, O, R>
An operator that both filters and maps. It yields only the items for which the supplied closure f
returns Some(value)
.
§Example
process
.source_iter(q!(vec!["1", "hello", "world", "2"]))
.filter_map(q!(|s| s.parse::<usize>().ok()))
// 1, 2
Sourcepub fn cross_singleton<O2>(
self,
other: impl Into<Optional<O2, L, Bounded>>,
) -> Stream<(T, O2), L, B, O, R>where
O2: Clone,
pub fn cross_singleton<O2>(
self,
other: impl Into<Optional<O2, L, Bounded>>,
) -> Stream<(T, O2), L, B, O, R>where
O2: Clone,
Generates a stream that maps each input element i
to a tuple (i, x)
,
where x
is the final value of other
, a bounded Singleton
.
§Example
let tick = process.tick();
let batch = process
.source_iter(q!(vec![1, 2, 3, 4]))
.batch(&tick, nondet!(/** test */));
let count = batch.clone().count(); // `count()` returns a singleton
batch.cross_singleton(count).all_ticks()
// (1, 4), (2, 4), (3, 4), (4, 4)
Sourcepub fn filter_if_some<U>(
self,
signal: Optional<U, L, Bounded>,
) -> Stream<T, L, B, O, R>
pub fn filter_if_some<U>( self, signal: Optional<U, L, Bounded>, ) -> Stream<T, L, B, O, R>
Passes this stream through if the argument (a Bounded
Optional
`) is non-null, otherwise the output is empty.
Useful for gating the release of elements based on a condition, such as only processing requests if you are the leader of a cluster.
§Example
let tick = process.tick();
// ticks are lazy by default, forces the second tick to run
tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
let batch_first_tick = process
.source_iter(q!(vec![1, 2, 3, 4]))
.batch(&tick, nondet!(/** test */));
let batch_second_tick = process
.source_iter(q!(vec![5, 6, 7, 8]))
.batch(&tick, nondet!(/** test */))
.defer_tick(); // appears on the second tick
let some_on_first_tick = tick.optional_first_tick(q!(()));
batch_first_tick.chain(batch_second_tick)
.filter_if_some(some_on_first_tick)
.all_ticks()
// [1, 2, 3, 4]
Sourcepub fn filter_if_none<U>(
self,
other: Optional<U, L, Bounded>,
) -> Stream<T, L, B, O, R>
pub fn filter_if_none<U>( self, other: Optional<U, L, Bounded>, ) -> Stream<T, L, B, O, R>
Passes this stream through if the argument (a Bounded
Optional
`) is null, otherwise the output is empty.
Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing some local state.
§Example
let tick = process.tick();
// ticks are lazy by default, forces the second tick to run
tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
let batch_first_tick = process
.source_iter(q!(vec![1, 2, 3, 4]))
.batch(&tick, nondet!(/** test */));
let batch_second_tick = process
.source_iter(q!(vec![5, 6, 7, 8]))
.batch(&tick, nondet!(/** test */))
.defer_tick(); // appears on the second tick
let some_on_first_tick = tick.optional_first_tick(q!(()));
batch_first_tick.chain(batch_second_tick)
.filter_if_none(some_on_first_tick)
.all_ticks()
// [5, 6, 7, 8]
Sourcepub fn cross_product<T2, O2: Ordering>(
self,
other: Stream<T2, L, B, O2, R>,
) -> Stream<(T, T2), L, B, NoOrder, R>
pub fn cross_product<T2, O2: Ordering>( self, other: Stream<T2, L, B, O2, R>, ) -> Stream<(T, T2), L, B, NoOrder, R>
Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all tupled pairs in a non-deterministic order.
§Example
let tick = process.tick();
let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
let stream2 = process.source_iter(q!(vec![1, 2, 3]));
stream1.cross_product(stream2)
Sourcepub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
Takes one stream as input and filters out any duplicate occurrences. The output contains all unique values from the input.
§Example
let tick = process.tick();
process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
Sourcepub fn filter_not_in<O2: Ordering>(
self,
other: Stream<T, L, Bounded, O2, R>,
) -> Stream<T, L, Bounded, O, R>
pub fn filter_not_in<O2: Ordering>( self, other: Stream<T, L, Bounded, O2, R>, ) -> Stream<T, L, Bounded, O, R>
Outputs everything in this stream that is not contained in the other
stream.
The other
stream must be Bounded
, since this function will wait until
all its elements are available before producing any output.
§Example
let tick = process.tick();
let stream = process
.source_iter(q!(vec![ 1, 2, 3, 4 ]))
.batch(&tick, nondet!(/** test */));
let batch = process
.source_iter(q!(vec![1, 2]))
.batch(&tick, nondet!(/** test */));
stream.filter_not_in(batch).all_ticks()
Sourcepub fn inspect<F>(
self,
f: impl IntoQuotedMut<'a, F, L>,
) -> Stream<T, L, B, O, R>
pub fn inspect<F>( self, f: impl IntoQuotedMut<'a, F, L>, ) -> Stream<T, L, B, O, R>
An operator which allows you to “inspect” each element of a stream without
modifying it. The closure f
is called on a reference to each item. This is
mainly useful for debugging, and should not be used to generate side-effects.
§Example
let nums = process.source_iter(q!(vec![1, 2]));
// prints "1 * 10 = 10" and "2 * 10 = 20"
nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
Sourcepub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R>
pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R>
An operator which allows you to “name” a HydroNode
.
This is only used for testing, to correlate certain HydroNode
s with IDs.
Sourcepub fn assume_ordering<O2: Ordering>(
self,
_nondet: NonDet,
) -> Stream<T, L, B, O2, R>
pub fn assume_ordering<O2: Ordering>( self, _nondet: NonDet, ) -> Stream<T, L, B, O2, R>
Explicitly “casts” the stream to a type with a different ordering guarantee. Useful in unsafe code where the ordering cannot be proven by the type-system.
§Non-Determinism
This function is used as an escape hatch, and any mistakes in the provided ordering guarantee will propagate into the guarantees for the rest of the program.
Sourcepub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R>
pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R>
Weakens the ordering guarantee provided by the stream to NoOrder
,
which is always safe because that is the weakest possible guarantee.
Sourcepub fn weaken_ordering<O2: Ordering + MinOrder<O, Min = O2>>(
self,
) -> Stream<T, L, B, O2, R>
pub fn weaken_ordering<O2: Ordering + MinOrder<O, Min = O2>>( self, ) -> Stream<T, L, B, O2, R>
Weakens the ordering guarantee provided by the stream to O2
, with the type-system
enforcing that O2
is weaker than the input ordering guarantee.
Sourcepub fn assume_retries<R2: Retries>(
self,
_nondet: NonDet,
) -> Stream<T, L, B, O, R2>
pub fn assume_retries<R2: Retries>( self, _nondet: NonDet, ) -> Stream<T, L, B, O, R2>
Explicitly “casts” the stream to a type with a different retries guarantee. Useful in unsafe code where the lack of retries cannot be proven by the type-system.
§Non-Determinism
This function is used as an escape hatch, and any mistakes in the provided retries guarantee will propagate into the guarantees for the rest of the program.
Sourcepub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce>
pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce>
Weakens the retries guarantee provided by the stream to AtLeastOnce
,
which is always safe because that is the weakest possible guarantee.
Sourcepub fn weaken_retries<R2: Retries + MinRetries<R, Min = R2>>(
self,
) -> Stream<T, L, B, O, R2>
pub fn weaken_retries<R2: Retries + MinRetries<R, Min = R2>>( self, ) -> Stream<T, L, B, O, R2>
Weakens the retries guarantee provided by the stream to R2
, with the type-system
enforcing that R2
is weaker than the input retries guarantee.
Source§impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>where
L: Location<'a>,
impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>where
L: Location<'a>,
Sourcepub fn weaker_retries<R2: Retries>(self) -> Stream<T, L, B, O, R2>
pub fn weaker_retries<R2: Retries>(self) -> Stream<T, L, B, O, R2>
Given a stream with ExactlyOnce
retry guarantees, weakens it to an arbitrary guarantee
R2
, which is safe because all guarantees are equal to or weaker than ExactlyOnce
Source§impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>where
L: Location<'a>,
impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>where
L: Location<'a>,
Source§impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>where
L: Location<'a>,
impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>where
L: Location<'a>,
Sourcepub fn fold_commutative_idempotent<A, I, F>(
self,
init: impl IntoQuotedMut<'a, I, L>,
comb: impl IntoQuotedMut<'a, F, L>,
) -> Singleton<A, L, B>
pub fn fold_commutative_idempotent<A, I, F>( self, init: impl IntoQuotedMut<'a, I, L>, comb: impl IntoQuotedMut<'a, F, L>, ) -> Singleton<A, L, B>
Combines elements of the stream into a Singleton
, by starting with an initial value,
generated by the init
closure, and then applying the comb
closure to each element in the stream.
Unlike iterators, comb
takes the accumulator by &mut
reference, so that it can be modified in place.
The comb
closure must be commutative AND idempotent, as the order of input items is not guaranteed
and there may be duplicates.
§Example
let tick = process.tick();
let bools = process.source_iter(q!(vec![false, true, false]));
let batch = bools.batch(&tick, nondet!(/** test */));
batch
.fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
.all_ticks()
// true
Sourcepub fn reduce_commutative_idempotent<F>(
self,
comb: impl IntoQuotedMut<'a, F, L>,
) -> Optional<T, L, B>
pub fn reduce_commutative_idempotent<F>( self, comb: impl IntoQuotedMut<'a, F, L>, ) -> Optional<T, L, B>
Combines elements of the stream into an Optional
, by starting with the first element in the stream,
and then applying the comb
closure to each element in the stream. The Optional
will be empty
until the first element in the input arrives. Unlike iterators, comb
takes the accumulator by &mut
reference, so that it can be modified in place.
The comb
closure must be commutative AND idempotent, as the order of input items is not guaranteed
and there may be duplicates.
§Example
let tick = process.tick();
let bools = process.source_iter(q!(vec![false, true, false]));
let batch = bools.batch(&tick, nondet!(/** test */));
batch
.reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
.all_ticks()
// true
Sourcepub fn max(self) -> Optional<T, L, B>where
T: Ord,
pub fn max(self) -> Optional<T, L, B>where
T: Ord,
Computes the maximum element in the stream as an Optional
, which
will be empty until the first element in the input arrives.
§Example
let tick = process.tick();
let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
let batch = numbers.batch(&tick, nondet!(/** test */));
batch.max().all_ticks()
// 4
Sourcepub fn max_by_key<K, F>(
self,
key: impl IntoQuotedMut<'a, F, L> + Copy,
) -> Optional<T, L, B>
pub fn max_by_key<K, F>( self, key: impl IntoQuotedMut<'a, F, L> + Copy, ) -> Optional<T, L, B>
Computes the maximum element in the stream as an Optional
, where the
maximum is determined according to the key
function. The Optional
will
be empty until the first element in the input arrives.
§Example
let tick = process.tick();
let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
let batch = numbers.batch(&tick, nondet!(/** test */));
batch.max_by_key(q!(|x| -x)).all_ticks()
// 1
Sourcepub fn min(self) -> Optional<T, L, B>where
T: Ord,
pub fn min(self) -> Optional<T, L, B>where
T: Ord,
Computes the minimum element in the stream as an Optional
, which
will be empty until the first element in the input arrives.
§Example
let tick = process.tick();
let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
let batch = numbers.batch(&tick, nondet!(/** test */));
batch.min().all_ticks()
// 1
Source§impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>where
L: Location<'a>,
impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>where
L: Location<'a>,
Sourcepub fn fold_commutative<A, I, F>(
self,
init: impl IntoQuotedMut<'a, I, L>,
comb: impl IntoQuotedMut<'a, F, L>,
) -> Singleton<A, L, B>
pub fn fold_commutative<A, I, F>( self, init: impl IntoQuotedMut<'a, I, L>, comb: impl IntoQuotedMut<'a, F, L>, ) -> Singleton<A, L, B>
Combines elements of the stream into a Singleton
, by starting with an initial value,
generated by the init
closure, and then applying the comb
closure to each element in the stream.
Unlike iterators, comb
takes the accumulator by &mut
reference, so that it can be modified in place.
The comb
closure must be commutative, as the order of input items is not guaranteed.
§Example
let tick = process.tick();
let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
.all_ticks()
// 10
Sourcepub fn reduce_commutative<F>(
self,
comb: impl IntoQuotedMut<'a, F, L>,
) -> Optional<T, L, B>
pub fn reduce_commutative<F>( self, comb: impl IntoQuotedMut<'a, F, L>, ) -> Optional<T, L, B>
Combines elements of the stream into a Optional
, by starting with the first element in the stream,
and then applying the comb
closure to each element in the stream. The Optional
will be empty
until the first element in the input arrives. Unlike iterators, comb
takes the accumulator by &mut
reference, so that it can be modified in place.
The comb
closure must be commutative, as the order of input items is not guaranteed.
§Example
let tick = process.tick();
let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.reduce_commutative(q!(|curr, new| *curr += new))
.all_ticks()
// 10
Source§impl<'a, T, L, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R>where
L: Location<'a>,
impl<'a, T, L, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R>where
L: Location<'a>,
Sourcepub fn fold_idempotent<A, I, F>(
self,
init: impl IntoQuotedMut<'a, I, L>,
comb: impl IntoQuotedMut<'a, F, L>,
) -> Singleton<A, L, B>
pub fn fold_idempotent<A, I, F>( self, init: impl IntoQuotedMut<'a, I, L>, comb: impl IntoQuotedMut<'a, F, L>, ) -> Singleton<A, L, B>
Combines elements of the stream into a Singleton
, by starting with an initial value,
generated by the init
closure, and then applying the comb
closure to each element in the stream.
Unlike iterators, comb
takes the accumulator by &mut
reference, so that it can be modified in place.
The comb
closure must be idempotent, as there may be non-deterministic duplicates.
§Example
let tick = process.tick();
let bools = process.source_iter(q!(vec![false, true, false]));
let batch = bools.batch(&tick, nondet!(/** test */));
batch
.fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
.all_ticks()
// true
Sourcepub fn reduce_idempotent<F>(
self,
comb: impl IntoQuotedMut<'a, F, L>,
) -> Optional<T, L, B>
pub fn reduce_idempotent<F>( self, comb: impl IntoQuotedMut<'a, F, L>, ) -> Optional<T, L, B>
Combines elements of the stream into an Optional
, by starting with the first element in the stream,
and then applying the comb
closure to each element in the stream. The Optional
will be empty
until the first element in the input arrives. Unlike iterators, comb
takes the accumulator by &mut
reference, so that it can be modified in place.
The comb
closure must be idempotent, as there may be non-deterministic duplicates.
§Example
let tick = process.tick();
let bools = process.source_iter(q!(vec![false, true, false]));
let batch = bools.batch(&tick, nondet!(/** test */));
batch.reduce_idempotent(q!(|acc, x| *acc |= x)).all_ticks()
// true
Sourcepub fn first(self) -> Optional<T, L, B>
pub fn first(self) -> Optional<T, L, B>
Computes the first element in the stream as an Optional
, which
will be empty until the first element in the input arrives.
This requires the stream to have a TotalOrder
guarantee, otherwise
re-ordering of elements may cause the first element to change.
§Example
let tick = process.tick();
let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
let batch = numbers.batch(&tick, nondet!(/** test */));
batch.first().all_ticks()
// 1
Sourcepub fn last(self) -> Optional<T, L, B>
pub fn last(self) -> Optional<T, L, B>
Computes the last element in the stream as an Optional
, which
will be empty until an element in the input arrives.
This requires the stream to have a TotalOrder
guarantee, otherwise
re-ordering of elements may cause the last element to change.
§Example
let tick = process.tick();
let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
let batch = numbers.batch(&tick, nondet!(/** test */));
batch.last().all_ticks()
// 4
Source§impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>where
L: Location<'a>,
impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>where
L: Location<'a>,
Sourcepub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder, ExactlyOnce>
pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder, ExactlyOnce>
Returns a stream with the current count tupled with each element in the input stream.
§Example
let tick = process.tick();
let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
numbers.enumerate()
// (0, 1), (1, 2), (2, 3), (3, 4)
Sourcepub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
self,
init: impl IntoQuotedMut<'a, I, L>,
comb: impl IntoQuotedMut<'a, F, L>,
) -> Singleton<A, L, B>
pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>( self, init: impl IntoQuotedMut<'a, I, L>, comb: impl IntoQuotedMut<'a, F, L>, ) -> Singleton<A, L, B>
Combines elements of the stream into a Singleton
, by starting with an intitial value,
generated by the init
closure, and then applying the comb
closure to each element in the stream.
Unlike iterators, comb
takes the accumulator by &mut
reference, so that it can be modified in place.
The input stream must have a TotalOrder
guarantee, which means that the comb
closure is allowed
to depend on the order of elements in the stream.
§Example
let tick = process.tick();
let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
let batch = words.batch(&tick, nondet!(/** test */));
batch
.fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
.all_ticks()
// "HELLOWORLD"
Sourcepub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
Collects all the elements of this stream into a single Vec
element.
If the input stream is Unbounded
, the output Singleton
will be Unbounded
as
well, which means that the value of the Vec
will asynchronously grow as new elements
are added. On such a value, you can use Singleton::snapshot
to grab an instance of
the vector at an arbitrary point in time.
§Example
let tick = process.tick();
let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
let batch = numbers.batch(&tick, nondet!(/** test */));
batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
// [ vec![1, 2, 3, 4] ]
Sourcepub fn scan<A, U, I, F>(
self,
init: impl IntoQuotedMut<'a, I, L>,
f: impl IntoQuotedMut<'a, F, L>,
) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
pub fn scan<A, U, I, F>( self, init: impl IntoQuotedMut<'a, I, L>, f: impl IntoQuotedMut<'a, F, L>, ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
Applies a function to each element of the stream, maintaining an internal state (accumulator) and emitting each intermediate result.
Unlike fold
which only returns the final accumulated value, scan
produces a new stream
containing all intermediate accumulated values. The scan operation can also terminate early
by returning None
.
The function takes a mutable reference to the accumulator and the current element, and returns
an Option<U>
. If the function returns Some(value)
, value
is emitted to the output stream.
If the function returns None
, the stream is terminated and no more elements are processed.
§Examples
Basic usage - running sum:
process.source_iter(q!(vec![1, 2, 3, 4])).scan(
q!(|| 0),
q!(|acc, x| {
*acc += x;
Some(*acc)
}),
)
// Output: 1, 3, 6, 10
Early termination example:
process.source_iter(q!(vec![1, 2, 3, 4])).scan(
q!(|| 1),
q!(|state, x| {
*state = *state * x;
if *state > 6 {
None // Terminate the stream
} else {
Some(-*state)
}
}),
)
// Output: -1, -2, -6
Sourcepub fn reduce<F: Fn(&mut T, T) + 'a>(
self,
comb: impl IntoQuotedMut<'a, F, L>,
) -> Optional<T, L, B>
pub fn reduce<F: Fn(&mut T, T) + 'a>( self, comb: impl IntoQuotedMut<'a, F, L>, ) -> Optional<T, L, B>
Combines elements of the stream into an Optional
, by starting with the first element in the stream,
and then applying the comb
closure to each element in the stream. The Optional
will be empty
until the first element in the input arrives.
The input stream must have a TotalOrder
guarantee, which means that the comb
closure is allowed
to depend on the order of elements in the stream.
§Example
let tick = process.tick();
let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
let batch = words.batch(&tick, nondet!(/** test */));
batch
.map(q!(|x| x.to_string()))
.reduce(q!(|curr, new| curr.push_str(&new)))
.all_ticks()
// "HELLOWORLD"
Source§impl<'a, T, L: Location<'a> + NoTick + NoAtomic, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R>
impl<'a, T, L: Location<'a> + NoTick + NoAtomic, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R>
Sourcepub fn interleave<O2: Ordering, R2: Retries>(
self,
other: Stream<T, L, Unbounded, O2, R2>,
) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>where
R: MinRetries<R2>,
pub fn interleave<O2: Ordering, R2: Retries>(
self,
other: Stream<T, L, Unbounded, O2, R2>,
) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>where
R: MinRetries<R2>,
Produces a new stream that interleaves the elements of the two input streams.
The result has NoOrder
because the order of interleaving is not guaranteed.
Currently, both input streams must be Unbounded
. When the streams are
Bounded
, you can use Stream::chain
instead.
§Example
let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
// 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
Source§impl<'a, T, L, O: Ordering, R: Retries> Stream<T, L, Bounded, O, R>where
L: Location<'a>,
impl<'a, T, L, O: Ordering, R: Retries> Stream<T, L, Bounded, O, R>where
L: Location<'a>,
Sourcepub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>where
T: Ord,
pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>where
T: Ord,
Produces a new stream that emits the input elements in sorted order.
The input stream can have any ordering guarantee, but the output stream
will have a TotalOrder
guarantee. This operator will block until all
elements in the input stream are available, so it requires the input stream
to be Bounded
.
§Example
let tick = process.tick();
let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
let batch = numbers.batch(&tick, nondet!(/** test */));
batch.sort().all_ticks()
// 1, 2, 3, 4
Sourcepub fn chain<O2: Ordering, R2: Retries>(
self,
other: Stream<T, L, Bounded, O2, R2>,
) -> Stream<T, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>where
O: MinOrder<O2>,
R: MinRetries<R2>,
pub fn chain<O2: Ordering, R2: Retries>(
self,
other: Stream<T, L, Bounded, O2, R2>,
) -> Stream<T, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>where
O: MinOrder<O2>,
R: MinRetries<R2>,
Produces a new stream that first emits the elements of the self
stream,
and then emits the elements of the other
stream. The output stream has
a TotalOrder
guarantee if and only if both input streams have a
TotalOrder
guarantee.
Currently, both input streams must be Bounded
. This operator will block
on the first stream until all its elements are available. In a future version,
we will relax the requirement on the other
stream.
§Example
let tick = process.tick();
let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
let batch = numbers.batch(&tick, nondet!(/** test */));
batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
// 2, 3, 4, 5, 1, 2, 3, 4
Sourcepub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
self,
other: Stream<T2, L, Bounded, O2, R>,
) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>( self, other: Stream<T2, L, Bounded, O2, R>, ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
Unlike Stream::cross_product
, the output order is totally ordered when the inputs are
because this is compiled into a nested loop.
Sourcepub fn repeat_with_keys<K, V2>(
self,
keys: KeyedSingleton<K, V2, L, Bounded>,
) -> KeyedStream<K, T, L, Bounded, O, R>
pub fn repeat_with_keys<K, V2>( self, keys: KeyedSingleton<K, V2, L, Bounded>, ) -> KeyedStream<K, T, L, Bounded, O, R>
Creates a KeyedStream
with the same set of keys as keys
, but with the elements in
self
used as the values for each key.
This is helpful when “broadcasting” a set of values so that all the keys have the same
values. For example, it can be used to send the same set of elements to several cluster
members, if the membership information is available as a KeyedSingleton
.
§Example
let keyed_singleton = // { 1: (), 2: () }
let stream = // [ "a", "b" ]
stream.repeat_with_keys(keyed_singleton)
// { 1: ["a", "b" ], 2: ["a", "b"] }
Source§impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>where
L: Location<'a>,
impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>where
L: Location<'a>,
Sourcepub fn join<V2, O2: Ordering, R2: Retries>(
self,
n: Stream<(K, V2), L, B, O2, R2>,
) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
pub fn join<V2, O2: Ordering, R2: Retries>( self, n: Stream<(K, V2), L, B, O2, R2>, ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
Given two streams of pairs (K, V1)
and (K, V2)
, produces a new stream of nested pairs (K, (V1, V2))
by equi-joining the two streams on the key attribute K
.
§Example
let tick = process.tick();
let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
stream1.join(stream2)
// (1, ('a', 'x')), (2, ('b', 'y'))
Sourcepub fn anti_join<O2: Ordering, R2: Retries>(
self,
n: Stream<K, L, Bounded, O2, R2>,
) -> Stream<(K, V1), L, B, O, R>
pub fn anti_join<O2: Ordering, R2: Retries>( self, n: Stream<K, L, Bounded, O2, R2>, ) -> Stream<(K, V1), L, B, O, R>
Given a stream of pairs (K, V1)
and a bounded stream of keys K
,
computes the anti-join of the items in the input – i.e. returns
unique items in the first input that do not have a matching key
in the second input.
§Example
let tick = process.tick();
let stream = process
.source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
.batch(&tick, nondet!(/** test */));
let batch = process
.source_iter(q!(vec![1, 2]))
.batch(&tick, nondet!(/** test */));
stream.anti_join(batch).all_ticks()
Source§impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V), L, B, O, R>
impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V), L, B, O, R>
Sourcepub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R>
pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R>
Transforms this stream into a KeyedStream
, where the first element of each tuple
is used as the key and the second element is added to the entries associated with that key.
Because KeyedStream
lazily groups values into buckets, this operator has zero computational
cost and does not require that the key type is hashable. Keyed streams are useful for
performing grouped aggregations, but also for more precise ordering guarantees such as
total ordering within each group but no ordering across groups.
§Example
process
.source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
.into_keyed()
// { 1: [2, 3], 2: [4] }
Source§impl<'a, K, V, L> Stream<(K, V), Tick<L>, Bounded, TotalOrder, ExactlyOnce>
impl<'a, K, V, L> Stream<(K, V), Tick<L>, Bounded, TotalOrder, ExactlyOnce>
Sourcepub fn fold_keyed<A, I, F>(
self,
init: impl IntoQuotedMut<'a, I, Tick<L>>,
comb: impl IntoQuotedMut<'a, F, Tick<L>>,
) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
👎Deprecated: use .into_keyed().fold(…) instead
pub fn fold_keyed<A, I, F>( self, init: impl IntoQuotedMut<'a, I, Tick<L>>, comb: impl IntoQuotedMut<'a, F, Tick<L>>, ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
A special case of Stream::fold
, in the spirit of SQL’s GROUP BY and aggregation constructs. The input
tuples are partitioned into groups by the first element (“keys”), and for each group the values
in the second element are accumulated via the comb
closure.
The input stream must have a TotalOrder
guarantee, which means that the comb
closure is allowed
to depend on the order of elements in the stream.
If the input and output value types are the same and do not require initialization then use
Stream::reduce_keyed
.
§Example
let tick = process.tick();
let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.fold_keyed(q!(|| 0), q!(|acc, x| *acc += x))
.all_ticks()
// (1, 5), (2, 7)
Sourcepub fn reduce_keyed<F>(
self,
comb: impl IntoQuotedMut<'a, F, Tick<L>>,
) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
👎Deprecated: use .into_keyed().reduce(…) instead
pub fn reduce_keyed<F>( self, comb: impl IntoQuotedMut<'a, F, Tick<L>>, ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
A special case of Stream::reduce
, in the spirit of SQL’s GROUP BY and aggregation constructs. The input
tuples are partitioned into groups by the first element (“keys”), and for each group the values
in the second element are accumulated via the comb
closure.
The input stream must have a TotalOrder
guarantee, which means that the comb
closure is allowed
to depend on the order of elements in the stream.
If you need the accumulated value to have a different type than the input, use Stream::fold_keyed
.
§Example
let tick = process.tick();
let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
let batch = numbers.batch(&tick, nondet!(/** test */));
batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks()
// (1, 5), (2, 7)
Source§impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
Sourcepub fn fold_keyed_commutative_idempotent<A, I, F>(
self,
init: impl IntoQuotedMut<'a, I, Tick<L>>,
comb: impl IntoQuotedMut<'a, F, Tick<L>>,
) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
👎Deprecated: use .into_keyed().fold_commutative_idempotent(…) instead
pub fn fold_keyed_commutative_idempotent<A, I, F>( self, init: impl IntoQuotedMut<'a, I, Tick<L>>, comb: impl IntoQuotedMut<'a, F, Tick<L>>, ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
A special case of Stream::fold_commutative_idempotent
, in the spirit of SQL’s GROUP BY and aggregation constructs.
The input tuples are partitioned into groups by the first element (“keys”), and for each group the values
in the second element are accumulated via the comb
closure.
The comb
closure must be commutative, as the order of input items is not guaranteed, and idempotent,
as there may be non-deterministic duplicates.
If the input and output value types are the same and do not require initialization then use
Stream::reduce_keyed_commutative_idempotent
.
§Example
let tick = process.tick();
let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.fold_keyed_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
.all_ticks()
// (1, false), (2, true)
Sourcepub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce>
pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce>
Given a stream of pairs (K, V)
, produces a new stream of unique keys K
.
§Example
let tick = process.tick();
let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
let batch = numbers.batch(&tick, nondet!(/** test */));
batch.keys().all_ticks()
// 1, 2
Sourcepub fn reduce_keyed_commutative_idempotent<F>(
self,
comb: impl IntoQuotedMut<'a, F, Tick<L>>,
) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
👎Deprecated: use .into_keyed().reduce_commutative_idempotent(…) instead
pub fn reduce_keyed_commutative_idempotent<F>( self, comb: impl IntoQuotedMut<'a, F, Tick<L>>, ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
A special case of Stream::reduce_commutative_idempotent
, in the spirit of SQL’s GROUP BY and aggregation constructs.
The input tuples are partitioned into groups by the first element (“keys”), and for each group the values
in the second element are accumulated via the comb
closure.
The comb
closure must be commutative, as the order of input items is not guaranteed, and idempotent,
as there may be non-deterministic duplicates.
If you need the accumulated value to have a different type than the input, use Stream::fold_keyed_commutative_idempotent
.
§Example
let tick = process.tick();
let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.reduce_keyed_commutative_idempotent(q!(|acc, x| *acc |= x))
.all_ticks()
// (1, false), (2, true)
Source§impl<'a, K, V, L, O: Ordering> Stream<(K, V), Tick<L>, Bounded, O, ExactlyOnce>
impl<'a, K, V, L, O: Ordering> Stream<(K, V), Tick<L>, Bounded, O, ExactlyOnce>
Sourcepub fn fold_keyed_commutative<A, I, F>(
self,
init: impl IntoQuotedMut<'a, I, Tick<L>>,
comb: impl IntoQuotedMut<'a, F, Tick<L>>,
) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
👎Deprecated: use .into_keyed().fold_commutative(…) instead
pub fn fold_keyed_commutative<A, I, F>( self, init: impl IntoQuotedMut<'a, I, Tick<L>>, comb: impl IntoQuotedMut<'a, F, Tick<L>>, ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
A special case of Stream::fold_commutative
, in the spirit of SQL’s GROUP BY and aggregation constructs. The input
tuples are partitioned into groups by the first element (“keys”), and for each group the values
in the second element are accumulated via the comb
closure.
The comb
closure must be commutative, as the order of input items is not guaranteed.
If the input and output value types are the same and do not require initialization then use
Stream::reduce_keyed_commutative
.
§Example
let tick = process.tick();
let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x))
.all_ticks()
// (1, 5), (2, 7)
Sourcepub fn reduce_keyed_commutative<F>(
self,
comb: impl IntoQuotedMut<'a, F, Tick<L>>,
) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
👎Deprecated: use .into_keyed().reduce_commutative(…) instead
pub fn reduce_keyed_commutative<F>( self, comb: impl IntoQuotedMut<'a, F, Tick<L>>, ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
A special case of Stream::reduce_commutative
, in the spirit of SQL’s GROUP BY and aggregation constructs. The input
tuples are partitioned into groups by the first element (“keys”), and for each group the values
in the second element are accumulated via the comb
closure.
The comb
closure must be commutative, as the order of input items is not guaranteed.
If you need the accumulated value to have a different type than the input, use Stream::fold_keyed_commutative
.
§Example
let tick = process.tick();
let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.reduce_keyed_commutative(q!(|acc, x| *acc += x))
.all_ticks()
// (1, 5), (2, 7)
Source§impl<'a, K, V, L, R: Retries> Stream<(K, V), Tick<L>, Bounded, TotalOrder, R>
impl<'a, K, V, L, R: Retries> Stream<(K, V), Tick<L>, Bounded, TotalOrder, R>
Sourcepub fn fold_keyed_idempotent<A, I, F>(
self,
init: impl IntoQuotedMut<'a, I, Tick<L>>,
comb: impl IntoQuotedMut<'a, F, Tick<L>>,
) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
👎Deprecated: use .into_keyed().fold_idempotent(…) instead
pub fn fold_keyed_idempotent<A, I, F>( self, init: impl IntoQuotedMut<'a, I, Tick<L>>, comb: impl IntoQuotedMut<'a, F, Tick<L>>, ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
A special case of Stream::fold_idempotent
, in the spirit of SQL’s GROUP BY and aggregation constructs.
The input tuples are partitioned into groups by the first element (“keys”), and for each group the values
in the second element are accumulated via the comb
closure.
The comb
closure must be idempotent as there may be non-deterministic duplicates.
If the input and output value types are the same and do not require initialization then use
Stream::reduce_keyed_idempotent
.
§Example
let tick = process.tick();
let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.fold_keyed_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
.all_ticks()
// (1, false), (2, true)
Sourcepub fn reduce_keyed_idempotent<F>(
self,
comb: impl IntoQuotedMut<'a, F, Tick<L>>,
) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
👎Deprecated: use .into_keyed().reduce_idempotent(…) instead
pub fn reduce_keyed_idempotent<F>( self, comb: impl IntoQuotedMut<'a, F, Tick<L>>, ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
A special case of Stream::reduce_idempotent
, in the spirit of SQL’s GROUP BY and aggregation constructs.
The input tuples are partitioned into groups by the first element (“keys”), and for each group the values
in the second element are accumulated via the comb
closure.
The comb
closure must be idempotent, as there may be non-deterministic duplicates.
If you need the accumulated value to have a different type than the input, use Stream::fold_keyed_idempotent
.
§Example
let tick = process.tick();
let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
let batch = numbers.batch(&tick, nondet!(/** test */));
batch
.reduce_keyed_idempotent(q!(|acc, x| *acc |= x))
.all_ticks()
// (1, false), (2, true)
Source§impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
Sourcepub fn batch(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R>
pub fn batch(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R>
Returns a stream corresponding to the latest batch of elements being atomically processed. These batches are guaranteed to be contiguous across ticks and preserve the order of the input.
§Non-Determinism
The batch boundaries are non-deterministic and may change across executions.
Sourcepub fn end_atomic(self) -> Stream<T, L, B, O, R>
pub fn end_atomic(self) -> Stream<T, L, B, O, R>
Yields the elements of this stream back into a top-level, asynchronous execution context.
See Stream::atomic
for more details.
Sourcepub fn atomic_source(&self) -> Tick<L>
pub fn atomic_source(&self) -> Tick<L>
Gets the Tick
inside which this stream is synchronously processed. See Stream::atomic
.
Source§impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
Sourcepub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R>
pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R>
Shifts this stream into an atomic context, which guarantees that any downstream logic
will all be executed synchronously before any outputs are yielded (in Stream::end_atomic
).
This is useful to enforce local consistency constraints, such as ensuring that a write is
processed before an acknowledgement is emitted. Entering an atomic section requires a Tick
argument that declares where the stream will be atomically processed. Batching a stream into
the same Tick
will preserve the synchronous execution, while batching into a different
Tick
will introduce asynchrony.
Sourcepub fn resolve_futures<T2>(self) -> Stream<T2, L, B, NoOrder, R>where
T: Future<Output = T2>,
pub fn resolve_futures<T2>(self) -> Stream<T2, L, B, NoOrder, R>where
T: Future<Output = T2>,
Consumes a stream of Future<T>
, produces a new stream of the resulting T
outputs.
Future outputs are produced as available, regardless of input arrival order.
§Example
process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
.map(q!(|x| async move {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
x
}))
.resolve_futures()
// 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
Sourcepub fn batch(
self,
tick: &Tick<L>,
nondet: NonDet,
) -> Stream<T, Tick<L>, Bounded, O, R>
pub fn batch( self, tick: &Tick<L>, nondet: NonDet, ) -> Stream<T, Tick<L>, Bounded, O, R>
Given a tick, returns a stream corresponding to a batch of elements segmented by
that tick. These batches are guaranteed to be contiguous across ticks and preserve
the order of the input. The output stream will execute in the Tick
that was
used to create the atomic section.
§Non-Determinism
The batch boundaries are non-deterministic and may change across executions.
Sourcepub fn sample_every(
self,
interval: impl QuotedWithContext<'a, Duration, L> + Copy + 'a,
nondet: NonDet,
) -> Stream<T, L, Unbounded, O, AtLeastOnce>
pub fn sample_every( self, interval: impl QuotedWithContext<'a, Duration, L> + Copy + 'a, nondet: NonDet, ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
Given a time interval, returns a stream corresponding to samples taken from the stream roughly at that interval. The output will have elements in the same order as the input, but with arbitrary elements skipped between samples. There is also no guarantee on the exact timing of the samples.
§Non-Determinism
The output stream is non-deterministic in which elements are sampled, since this is controlled by a clock.
Sourcepub fn timeout(
self,
duration: impl QuotedWithContext<'a, Duration, Tick<L>> + Copy + 'a,
nondet: NonDet,
) -> Optional<(), L, Unbounded>
pub fn timeout( self, duration: impl QuotedWithContext<'a, Duration, Tick<L>> + Copy + 'a, nondet: NonDet, ) -> Optional<(), L, Unbounded>
Given a timeout duration, returns an Optional
which will have a value if the
stream has not emitted a value since that duration.
§Non-Determinism
Timeout relies on non-deterministic sampling of the stream, so depending on when
samples take place, timeouts may be non-deterministically generated or missed,
and the notification of the timeout may be delayed as well. There is also no
guarantee on how long the Optional
will have a value after the timeout is
detected based on when the next sample is taken.
Source§impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
Sourcepub fn resolve_futures_ordered(self) -> Stream<T, L, B, O, R>
pub fn resolve_futures_ordered(self) -> Stream<T, L, B, O, R>
Consumes a stream of Future<T>
, produces a new stream of the resulting T
outputs.
Future outputs are produced in the same order as the input stream.
§Example
process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
.map(q!(|x| async move {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
x
}))
.resolve_futures_ordered()
// 2, 3, 1, 9, 6, 5, 4, 7, 8
Source§impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
Sourcepub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
Executes the provided closure for every element in this stream.
Because the closure may have side effects, the stream must have deterministic order
(TotalOrder
) and no retries (ExactlyOnce
). If the side effects can tolerate
out-of-order or duplicate execution, use Stream::assume_ordering
and
Stream::assume_retries
with an explanation for why this is the case.
Sourcepub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)where
S: 'a + Sink<T> + Unpin,
pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)where
S: 'a + Sink<T> + Unpin,
Sends all elements of this stream to a provided [futures::Sink
], such as an external
TCP socket to some other server. You should not use this API for interacting with
external clients, instead see Location::bidi_external_many_bytes
and
Location::bidi_external_many_bincode
. This should be used for custom, low-level
interaction with asynchronous sinks.
Source§impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>where
L: Location<'a>,
impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>where
L: Location<'a>,
Sourcepub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R>
pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R>
Asynchronously yields this batch of elements outside the tick as an unbounded stream, which will stream all the elements across all tick iterations by concatenating the batches.
Sourcepub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R>
pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R>
Synchronously yields this batch of elements outside the tick as an unbounded stream, which will stream all the elements across all tick iterations by concatenating the batches.
Unlike Stream::all_ticks
, this preserves synchronous execution, as the output stream
is emitted in an Atomic
context that will process elements synchronously with the input
stream’s Tick
context.
Sourcepub fn persist(self) -> Stream<T, Tick<L>, Bounded, O, R>where
T: Clone,
pub fn persist(self) -> Stream<T, Tick<L>, Bounded, O, R>where
T: Clone,
Accumulates the elements of this stream across ticks by concatenating them together.
The output stream in tick T will contain the elements of the input at tick 0, 1, …, up to and including tick T. This is useful for accumulating streaming inputs across ticks, but be careful when using this operator, as its memory usage will grow linearly over time since it must store its inputs indefinitely.
§Example
let tick = process.tick();
// ticks are lazy by default, forces the second tick to run
tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
let batch_first_tick = process
.source_iter(q!(vec![1, 2, 3, 4]))
.batch(&tick, nondet!(/** test */));
let batch_second_tick = process
.source_iter(q!(vec![5, 6, 7, 8]))
.batch(&tick, nondet!(/** test */))
.defer_tick(); // appears on the second tick
batch_first_tick.chain(batch_second_tick)
.persist()
.all_ticks()
// [1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, ...]
pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R>
Trait Implementations§
Source§impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>where
L: Location<'a>,
impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>where
L: Location<'a>,
fn defer_tick(self) -> Self
Source§impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O>> for Stream<T, L, B, O, AtLeastOnce>where
L: Location<'a>,
impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O>> for Stream<T, L, B, O, AtLeastOnce>where
L: Location<'a>,
Source§fn from(
stream: Stream<T, L, B, O, ExactlyOnce>,
) -> Stream<T, L, B, O, AtLeastOnce>
fn from( stream: Stream<T, L, B, O, ExactlyOnce>, ) -> Stream<T, L, B, O, AtLeastOnce>
Source§impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>> for Stream<T, L, B, NoOrder, R>where
L: Location<'a>,
impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>> for Stream<T, L, B, NoOrder, R>where
L: Location<'a>,
Auto Trait Implementations§
impl<Type, Loc, Bound, Order = TotalOrder, Retry = ExactlyOnce> !Freeze for Stream<Type, Loc, Bound, Order, Retry>
impl<Type, Loc, Bound, Order = TotalOrder, Retry = ExactlyOnce> !RefUnwindSafe for Stream<Type, Loc, Bound, Order, Retry>
impl<Type, Loc, Bound, Order = TotalOrder, Retry = ExactlyOnce> !Send for Stream<Type, Loc, Bound, Order, Retry>
impl<Type, Loc, Bound, Order = TotalOrder, Retry = ExactlyOnce> !Sync for Stream<Type, Loc, Bound, Order, Retry>
impl<Type, Loc, Bound, Order, Retry> Unpin for Stream<Type, Loc, Bound, Order, Retry>
impl<Type, Loc, Bound, Order = TotalOrder, Retry = ExactlyOnce> !UnwindSafe for Stream<Type, Loc, Bound, Order, Retry>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more