Skip to main content

hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::{Generate, KeyedStream};
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::singleton::SingletonBound;
27#[cfg(stageleft_runtime)]
28use crate::location::dynamic::{DynLocation, LocationId};
29use crate::location::tick::{Atomic, DeferTick, NoAtomic};
30use crate::location::{Location, NoTick, Tick, check_matching_location};
31use crate::manual_expr::ManualExpr;
32use crate::nondet::{NonDet, nondet};
33use crate::prelude::manual_proof;
34use crate::properties::{
35    AggFuncAlgebra, ApplyMonotoneStream, ValidCommutativityFor, ValidIdempotenceFor,
36};
37
38pub mod networking;
39
40/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
41#[sealed::sealed]
42pub trait Ordering:
43    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
44{
45    /// The [`StreamOrder`] corresponding to this type.
46    const ORDERING_KIND: StreamOrder;
47}
48
49/// Marks the stream as being totally ordered, which means that there are
50/// no sources of non-determinism (other than intentional ones) that will
51/// affect the order of elements.
52pub enum TotalOrder {}
53
54#[sealed::sealed]
55impl Ordering for TotalOrder {
56    const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
57}
58
59/// Marks the stream as having no order, which means that the order of
60/// elements may be affected by non-determinism.
61///
62/// This restricts certain operators, such as `fold` and `reduce`, to only
63/// be used with commutative aggregation functions.
64pub enum NoOrder {}
65
66#[sealed::sealed]
67impl Ordering for NoOrder {
68    const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
69}
70
71/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
72/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
73/// have `Self` guarantees instead.
74#[sealed::sealed]
75pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
76#[sealed::sealed]
77impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
78
79/// Helper trait for determining the weakest of two orderings.
80#[sealed::sealed]
81pub trait MinOrder<Other: ?Sized> {
82    /// The weaker of the two orderings.
83    type Min: Ordering;
84}
85
86#[sealed::sealed]
87impl<O: Ordering> MinOrder<O> for TotalOrder {
88    type Min = O;
89}
90
91#[sealed::sealed]
92impl<O: Ordering> MinOrder<O> for NoOrder {
93    type Min = NoOrder;
94}
95
96/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
97#[sealed::sealed]
98pub trait Retries:
99    MinRetries<Self, Min = Self>
100    + MinRetries<ExactlyOnce, Min = Self>
101    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
102{
103    /// The [`StreamRetry`] corresponding to this type.
104    const RETRIES_KIND: StreamRetry;
105}
106
107/// Marks the stream as having deterministic message cardinality, with no
108/// possibility of duplicates.
109pub enum ExactlyOnce {}
110
111#[sealed::sealed]
112impl Retries for ExactlyOnce {
113    const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
114}
115
116/// Marks the stream as having non-deterministic message cardinality, which
117/// means that duplicates may occur, but messages will not be dropped.
118pub enum AtLeastOnce {}
119
120#[sealed::sealed]
121impl Retries for AtLeastOnce {
122    const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
123}
124
125/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
126/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
127/// have `Self` guarantees instead.
128#[sealed::sealed]
129pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
130#[sealed::sealed]
131impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
132
133/// Helper trait for determining the weakest of two retry guarantees.
134#[sealed::sealed]
135pub trait MinRetries<Other: ?Sized> {
136    /// The weaker of the two retry guarantees.
137    type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
138}
139
140#[sealed::sealed]
141impl<R: Retries> MinRetries<R> for ExactlyOnce {
142    type Min = R;
143}
144
145#[sealed::sealed]
146impl<R: Retries> MinRetries<R> for AtLeastOnce {
147    type Min = AtLeastOnce;
148}
149
150#[sealed::sealed]
151#[diagnostic::on_unimplemented(
152    message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
153    label = "required here",
154    note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
155)]
156/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
157pub trait IsOrdered: Ordering {}
158
159#[sealed::sealed]
160#[diagnostic::do_not_recommend]
161impl IsOrdered for TotalOrder {}
162
163#[sealed::sealed]
164#[diagnostic::on_unimplemented(
165    message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
166    label = "required here",
167    note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
168)]
169/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
170pub trait IsExactlyOnce: Retries {}
171
172#[sealed::sealed]
173#[diagnostic::do_not_recommend]
174impl IsExactlyOnce for ExactlyOnce {}
175
176/// Streaming sequence of elements with type `Type`.
177///
178/// This live collection represents a growing sequence of elements, with new elements being
179/// asynchronously appended to the end of the sequence. This can be used to model the arrival
180/// of network input, such as API requests, or streaming ingestion.
181///
182/// By default, all streams have deterministic ordering and each element is materialized exactly
183/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
184/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
185/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
186///
187/// Type Parameters:
188/// - `Type`: the type of elements in the stream
189/// - `Loc`: the location where the stream is being materialized
190/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
191/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
192///   (default is [`TotalOrder`])
193/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
194///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
195pub struct Stream<
196    Type,
197    Loc,
198    Bound: Boundedness = Unbounded,
199    Order: Ordering = TotalOrder,
200    Retry: Retries = ExactlyOnce,
201> {
202    pub(crate) location: Loc,
203    pub(crate) ir_node: RefCell<HydroNode>,
204    pub(crate) flow_state: FlowState,
205
206    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
207}
208
209impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
210    fn drop(&mut self) {
211        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
212        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
213            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
214                input: Box::new(ir_node),
215                op_metadata: HydroIrOpMetadata::new(),
216            });
217        }
218    }
219}
220
221impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
222    for Stream<T, L, Unbounded, O, R>
223where
224    L: Location<'a>,
225{
226    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
227        let new_meta = stream
228            .location
229            .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
230
231        Stream {
232            location: stream.location.clone(),
233            flow_state: stream.flow_state.clone(),
234            ir_node: RefCell::new(HydroNode::Cast {
235                inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
236                metadata: new_meta,
237            }),
238            _phantom: PhantomData,
239        }
240    }
241}
242
243impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
244    for Stream<T, L, B, NoOrder, R>
245where
246    L: Location<'a>,
247{
248    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
249        stream.weaken_ordering()
250    }
251}
252
253impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
254    for Stream<T, L, B, O, AtLeastOnce>
255where
256    L: Location<'a>,
257{
258    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
259        stream.weaken_retries()
260    }
261}
262
263impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
264where
265    L: Location<'a>,
266{
267    fn defer_tick(self) -> Self {
268        Stream::defer_tick(self)
269    }
270}
271
272impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
273    for Stream<T, Tick<L>, Bounded, O, R>
274where
275    L: Location<'a>,
276{
277    type Location = Tick<L>;
278
279    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
280        Stream::new(
281            location.clone(),
282            HydroNode::CycleSource {
283                cycle_id,
284                metadata: location.new_node_metadata(Self::collection_kind()),
285            },
286        )
287    }
288}
289
290impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
291    for Stream<T, Tick<L>, Bounded, O, R>
292where
293    L: Location<'a>,
294{
295    type Location = Tick<L>;
296
297    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
298        let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
299            location.clone(),
300            HydroNode::DeferTick {
301                input: Box::new(HydroNode::CycleSource {
302                    cycle_id,
303                    metadata: location.new_node_metadata(Self::collection_kind()),
304                }),
305                metadata: location.new_node_metadata(Self::collection_kind()),
306            },
307        );
308
309        from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
310    }
311}
312
313impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
314    for Stream<T, Tick<L>, Bounded, O, R>
315where
316    L: Location<'a>,
317{
318    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
319        assert_eq!(
320            Location::id(&self.location),
321            expected_location,
322            "locations do not match"
323        );
324        self.location
325            .flow_state()
326            .borrow_mut()
327            .push_root(HydroRoot::CycleSink {
328                cycle_id,
329                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
330                op_metadata: HydroIrOpMetadata::new(),
331            });
332    }
333}
334
335impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
336    for Stream<T, L, B, O, R>
337where
338    L: Location<'a> + NoTick,
339{
340    type Location = L;
341
342    fn create_source(cycle_id: CycleId, location: L) -> Self {
343        Stream::new(
344            location.clone(),
345            HydroNode::CycleSource {
346                cycle_id,
347                metadata: location.new_node_metadata(Self::collection_kind()),
348            },
349        )
350    }
351}
352
353impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
354    for Stream<T, L, B, O, R>
355where
356    L: Location<'a> + NoTick,
357{
358    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
359        assert_eq!(
360            Location::id(&self.location),
361            expected_location,
362            "locations do not match"
363        );
364        self.location
365            .flow_state()
366            .borrow_mut()
367            .push_root(HydroRoot::CycleSink {
368                cycle_id,
369                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
370                op_metadata: HydroIrOpMetadata::new(),
371            });
372    }
373}
374
375impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
376where
377    T: Clone,
378    L: Location<'a>,
379{
380    fn clone(&self) -> Self {
381        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
382            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
383            *self.ir_node.borrow_mut() = HydroNode::Tee {
384                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
385                metadata: self.location.new_node_metadata(Self::collection_kind()),
386            };
387        }
388
389        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
390            Stream {
391                location: self.location.clone(),
392                flow_state: self.flow_state.clone(),
393                ir_node: HydroNode::Tee {
394                    inner: SharedNode(inner.0.clone()),
395                    metadata: metadata.clone(),
396                }
397                .into(),
398                _phantom: PhantomData,
399            }
400        } else {
401            unreachable!()
402        }
403    }
404}
405
406impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
407where
408    L: Location<'a>,
409{
410    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
411        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
412        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
413
414        let flow_state = location.flow_state().clone();
415        Stream {
416            location,
417            flow_state,
418            ir_node: RefCell::new(ir_node),
419            _phantom: PhantomData,
420        }
421    }
422
423    /// Returns the [`Location`] where this stream is being materialized.
424    pub fn location(&self) -> &L {
425        &self.location
426    }
427
428    pub(crate) fn collection_kind() -> CollectionKind {
429        CollectionKind::Stream {
430            bound: B::BOUND_KIND,
431            order: O::ORDERING_KIND,
432            retry: R::RETRIES_KIND,
433            element_type: quote_type::<T>().into(),
434        }
435    }
436
437    /// Produces a stream based on invoking `f` on each element.
438    /// If you do not want to modify the stream and instead only want to view
439    /// each item use [`Stream::inspect`] instead.
440    ///
441    /// # Example
442    /// ```rust
443    /// # #[cfg(feature = "deploy")] {
444    /// # use hydro_lang::prelude::*;
445    /// # use futures::StreamExt;
446    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
447    /// let words = process.source_iter(q!(vec!["hello", "world"]));
448    /// words.map(q!(|x| x.to_uppercase()))
449    /// # }, |mut stream| async move {
450    /// # for w in vec!["HELLO", "WORLD"] {
451    /// #     assert_eq!(stream.next().await.unwrap(), w);
452    /// # }
453    /// # }));
454    /// # }
455    /// ```
456    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
457    where
458        F: Fn(T) -> U + 'a,
459    {
460        let f = f.splice_fn1_ctx(&self.location).into();
461        Stream::new(
462            self.location.clone(),
463            HydroNode::Map {
464                f,
465                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
466                metadata: self
467                    .location
468                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
469            },
470        )
471    }
472
473    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
474    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
475    /// for the output type `U` must produce items in a **deterministic** order.
476    ///
477    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
478    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
479    ///
480    /// # Example
481    /// ```rust
482    /// # #[cfg(feature = "deploy")] {
483    /// # use hydro_lang::prelude::*;
484    /// # use futures::StreamExt;
485    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
486    /// process
487    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
488    ///     .flat_map_ordered(q!(|x| x))
489    /// # }, |mut stream| async move {
490    /// // 1, 2, 3, 4
491    /// # for w in (1..5) {
492    /// #     assert_eq!(stream.next().await.unwrap(), w);
493    /// # }
494    /// # }));
495    /// # }
496    /// ```
497    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
498    where
499        I: IntoIterator<Item = U>,
500        F: Fn(T) -> I + 'a,
501    {
502        let f = f.splice_fn1_ctx(&self.location).into();
503        Stream::new(
504            self.location.clone(),
505            HydroNode::FlatMap {
506                f,
507                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
508                metadata: self
509                    .location
510                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
511            },
512        )
513    }
514
515    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
516    /// for the output type `U` to produce items in any order.
517    ///
518    /// # Example
519    /// ```rust
520    /// # #[cfg(feature = "deploy")] {
521    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
522    /// # use futures::StreamExt;
523    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
524    /// process
525    ///     .source_iter(q!(vec![
526    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
527    ///         std::collections::HashSet::from_iter(vec![3, 4]),
528    ///     ]))
529    ///     .flat_map_unordered(q!(|x| x))
530    /// # }, |mut stream| async move {
531    /// // 1, 2, 3, 4, but in no particular order
532    /// # let mut results = Vec::new();
533    /// # for w in (1..5) {
534    /// #     results.push(stream.next().await.unwrap());
535    /// # }
536    /// # results.sort();
537    /// # assert_eq!(results, vec![1, 2, 3, 4]);
538    /// # }));
539    /// # }
540    /// ```
541    pub fn flat_map_unordered<U, I, F>(
542        self,
543        f: impl IntoQuotedMut<'a, F, L>,
544    ) -> Stream<U, L, B, NoOrder, R>
545    where
546        I: IntoIterator<Item = U>,
547        F: Fn(T) -> I + 'a,
548    {
549        let f = f.splice_fn1_ctx(&self.location).into();
550        Stream::new(
551            self.location.clone(),
552            HydroNode::FlatMap {
553                f,
554                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
555                metadata: self
556                    .location
557                    .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
558            },
559        )
560    }
561
562    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
563    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
564    ///
565    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
566    /// not deterministic, use [`Stream::flatten_unordered`] instead.
567    ///
568    /// ```rust
569    /// # #[cfg(feature = "deploy")] {
570    /// # use hydro_lang::prelude::*;
571    /// # use futures::StreamExt;
572    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
573    /// process
574    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
575    ///     .flatten_ordered()
576    /// # }, |mut stream| async move {
577    /// // 1, 2, 3, 4
578    /// # for w in (1..5) {
579    /// #     assert_eq!(stream.next().await.unwrap(), w);
580    /// # }
581    /// # }));
582    /// # }
583    /// ```
584    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
585    where
586        T: IntoIterator<Item = U>,
587    {
588        self.flat_map_ordered(q!(|d| d))
589    }
590
591    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
592    /// for the element type `T` to produce items in any order.
593    ///
594    /// # Example
595    /// ```rust
596    /// # #[cfg(feature = "deploy")] {
597    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
598    /// # use futures::StreamExt;
599    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
600    /// process
601    ///     .source_iter(q!(vec![
602    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
603    ///         std::collections::HashSet::from_iter(vec![3, 4]),
604    ///     ]))
605    ///     .flatten_unordered()
606    /// # }, |mut stream| async move {
607    /// // 1, 2, 3, 4, but in no particular order
608    /// # let mut results = Vec::new();
609    /// # for w in (1..5) {
610    /// #     results.push(stream.next().await.unwrap());
611    /// # }
612    /// # results.sort();
613    /// # assert_eq!(results, vec![1, 2, 3, 4]);
614    /// # }));
615    /// # }
616    /// ```
617    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
618    where
619        T: IntoIterator<Item = U>,
620    {
621        self.flat_map_unordered(q!(|d| d))
622    }
623
624    /// For each item in the input stream, apply `f` to produce a [`futures::stream::Stream`],
625    /// then emit the elements of that stream one by one. When the inner stream yields
626    /// `Pending`, this operator yields as well.
627    pub fn flat_map_stream_blocking<U, S, F>(
628        self,
629        f: impl IntoQuotedMut<'a, F, L>,
630    ) -> Stream<U, L, B, O, R>
631    where
632        S: futures::Stream<Item = U>,
633        F: Fn(T) -> S + 'a,
634    {
635        let f = f.splice_fn1_ctx(&self.location).into();
636        Stream::new(
637            self.location.clone(),
638            HydroNode::FlatMapStreamBlocking {
639                f,
640                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
641                metadata: self
642                    .location
643                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
644            },
645        )
646    }
647
648    /// For each item in the input stream, treat it as a [`futures::stream::Stream`] and
649    /// emit its elements one by one. When the inner stream yields `Pending`, this operator
650    /// yields as well.
651    pub fn flatten_stream_blocking<U>(self) -> Stream<U, L, B, O, R>
652    where
653        T: futures::Stream<Item = U>,
654    {
655        self.flat_map_stream_blocking(q!(|d| d))
656    }
657
658    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
659    /// `f`, preserving the order of the elements.
660    ///
661    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
662    /// not modify or take ownership of the values. If you need to modify the values while filtering
663    /// use [`Stream::filter_map`] instead.
664    ///
665    /// # Example
666    /// ```rust
667    /// # #[cfg(feature = "deploy")] {
668    /// # use hydro_lang::prelude::*;
669    /// # use futures::StreamExt;
670    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
671    /// process
672    ///     .source_iter(q!(vec![1, 2, 3, 4]))
673    ///     .filter(q!(|&x| x > 2))
674    /// # }, |mut stream| async move {
675    /// // 3, 4
676    /// # for w in (3..5) {
677    /// #     assert_eq!(stream.next().await.unwrap(), w);
678    /// # }
679    /// # }));
680    /// # }
681    /// ```
682    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
683    where
684        F: Fn(&T) -> bool + 'a,
685    {
686        let f = f.splice_fn1_borrow_ctx(&self.location).into();
687        Stream::new(
688            self.location.clone(),
689            HydroNode::Filter {
690                f,
691                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
692                metadata: self.location.new_node_metadata(Self::collection_kind()),
693            },
694        )
695    }
696
697    /// Splits the stream into two streams based on a predicate, without cloning elements.
698    ///
699    /// Elements for which `f` returns `true` are sent to the first output stream,
700    /// and elements for which `f` returns `false` are sent to the second output stream.
701    ///
702    /// Unlike using `filter` twice, this only evaluates the predicate once per element
703    /// and does not require `T: Clone`.
704    ///
705    /// The closure `f` receives a reference `&T` rather than an owned value `T` because
706    /// the predicate is only used for routing; the element itself is moved to the
707    /// appropriate output stream.
708    ///
709    /// # Example
710    /// ```rust
711    /// # #[cfg(feature = "deploy")] {
712    /// # use hydro_lang::prelude::*;
713    /// # use hydro_lang::live_collections::stream::{NoOrder, ExactlyOnce};
714    /// # use futures::StreamExt;
715    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
716    /// let numbers: Stream<_, _, Unbounded> = process.source_iter(q!(vec![1, 2, 3, 4, 5, 6])).into();
717    /// let (evens, odds) = numbers.partition(q!(|&x| x % 2 == 0));
718    /// // evens: 2, 4, 6 tagged with true; odds: 1, 3, 5 tagged with false
719    /// evens.map(q!(|x| (x, true)))
720    ///     .merge_unordered(odds.map(q!(|x| (x, false))))
721    /// # }, |mut stream| async move {
722    /// # let mut results = Vec::new();
723    /// # for _ in 0..6 {
724    /// #     results.push(stream.next().await.unwrap());
725    /// # }
726    /// # results.sort();
727    /// # assert_eq!(results, vec![(1, false), (2, true), (3, false), (4, true), (5, false), (6, true)]);
728    /// # }));
729    /// # }
730    /// ```
731    #[expect(
732        clippy::type_complexity,
733        reason = "return type mirrors the input stream type"
734    )]
735    pub fn partition<F>(
736        self,
737        f: impl IntoQuotedMut<'a, F, L>,
738    ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
739    where
740        F: Fn(&T) -> bool + 'a,
741    {
742        let f: crate::compile::ir::DebugExpr = f.splice_fn1_borrow_ctx(&self.location).into();
743        let shared = SharedNode(Rc::new(RefCell::new(
744            self.ir_node.replace(HydroNode::Placeholder),
745        )));
746
747        let true_stream = Stream::new(
748            self.location.clone(),
749            HydroNode::Partition {
750                inner: SharedNode(shared.0.clone()),
751                f: f.clone(),
752                is_true: true,
753                metadata: self.location.new_node_metadata(Self::collection_kind()),
754            },
755        );
756
757        let false_stream = Stream::new(
758            self.location.clone(),
759            HydroNode::Partition {
760                inner: SharedNode(shared.0),
761                f,
762                is_true: false,
763                metadata: self.location.new_node_metadata(Self::collection_kind()),
764            },
765        );
766
767        (true_stream, false_stream)
768    }
769
770    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
771    ///
772    /// # Example
773    /// ```rust
774    /// # #[cfg(feature = "deploy")] {
775    /// # use hydro_lang::prelude::*;
776    /// # use futures::StreamExt;
777    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
778    /// process
779    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
780    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
781    /// # }, |mut stream| async move {
782    /// // 1, 2
783    /// # for w in (1..3) {
784    /// #     assert_eq!(stream.next().await.unwrap(), w);
785    /// # }
786    /// # }));
787    /// # }
788    /// ```
789    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
790    where
791        F: Fn(T) -> Option<U> + 'a,
792    {
793        let f = f.splice_fn1_ctx(&self.location).into();
794        Stream::new(
795            self.location.clone(),
796            HydroNode::FilterMap {
797                f,
798                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
799                metadata: self
800                    .location
801                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
802            },
803        )
804    }
805
806    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
807    /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
808    /// If `other` is an empty [`Optional`], no values will be produced.
809    ///
810    /// # Example
811    /// ```rust
812    /// # #[cfg(feature = "deploy")] {
813    /// # use hydro_lang::prelude::*;
814    /// # use futures::StreamExt;
815    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
816    /// let tick = process.tick();
817    /// let batch = process
818    ///   .source_iter(q!(vec![1, 2, 3, 4]))
819    ///   .batch(&tick, nondet!(/** test */));
820    /// let count = batch.clone().count(); // `count()` returns a singleton
821    /// batch.cross_singleton(count).all_ticks()
822    /// # }, |mut stream| async move {
823    /// // (1, 4), (2, 4), (3, 4), (4, 4)
824    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
825    /// #     assert_eq!(stream.next().await.unwrap(), w);
826    /// # }
827    /// # }));
828    /// # }
829    /// ```
830    pub fn cross_singleton<O2>(
831        self,
832        other: impl Into<Optional<O2, L, Bounded>>,
833    ) -> Stream<(T, O2), L, B, O, R>
834    where
835        O2: Clone,
836    {
837        let other: Optional<O2, L, Bounded> = other.into();
838        check_matching_location(&self.location, &other.location);
839
840        Stream::new(
841            self.location.clone(),
842            HydroNode::CrossSingleton {
843                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
844                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
845                metadata: self
846                    .location
847                    .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
848            },
849        )
850    }
851
852    /// Passes this stream through if the boolean signal is `true`, otherwise the output is empty.
853    ///
854    /// # Example
855    /// ```rust
856    /// # #[cfg(feature = "deploy")] {
857    /// # use hydro_lang::prelude::*;
858    /// # use futures::StreamExt;
859    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
860    /// let tick = process.tick();
861    /// // ticks are lazy by default, forces the second tick to run
862    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
863    ///
864    /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
865    /// let batch_first_tick = process
866    ///   .source_iter(q!(vec![1, 2, 3, 4]))
867    ///   .batch(&tick, nondet!(/** test */));
868    /// let batch_second_tick = process
869    ///   .source_iter(q!(vec![5, 6, 7, 8]))
870    ///   .batch(&tick, nondet!(/** test */))
871    ///   .defer_tick();
872    /// batch_first_tick.chain(batch_second_tick)
873    ///   .filter_if(signal)
874    ///   .all_ticks()
875    /// # }, |mut stream| async move {
876    /// // [1, 2, 3, 4]
877    /// # for w in vec![1, 2, 3, 4] {
878    /// #     assert_eq!(stream.next().await.unwrap(), w);
879    /// # }
880    /// # }));
881    /// # }
882    /// ```
883    pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
884        self.cross_singleton(signal.filter(q!(|b| *b)))
885            .map(q!(|(d, _)| d))
886    }
887
888    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
889    ///
890    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
891    /// leader of a cluster.
892    ///
893    /// # Example
894    /// ```rust
895    /// # #[cfg(feature = "deploy")] {
896    /// # use hydro_lang::prelude::*;
897    /// # use futures::StreamExt;
898    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
899    /// let tick = process.tick();
900    /// // ticks are lazy by default, forces the second tick to run
901    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
902    ///
903    /// let batch_first_tick = process
904    ///   .source_iter(q!(vec![1, 2, 3, 4]))
905    ///   .batch(&tick, nondet!(/** test */));
906    /// let batch_second_tick = process
907    ///   .source_iter(q!(vec![5, 6, 7, 8]))
908    ///   .batch(&tick, nondet!(/** test */))
909    ///   .defer_tick(); // appears on the second tick
910    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
911    /// batch_first_tick.chain(batch_second_tick)
912    ///   .filter_if_some(some_on_first_tick)
913    ///   .all_ticks()
914    /// # }, |mut stream| async move {
915    /// // [1, 2, 3, 4]
916    /// # for w in vec![1, 2, 3, 4] {
917    /// #     assert_eq!(stream.next().await.unwrap(), w);
918    /// # }
919    /// # }));
920    /// # }
921    /// ```
922    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
923    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
924        self.filter_if(signal.is_some())
925    }
926
927    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
928    ///
929    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
930    /// some local state.
931    ///
932    /// # Example
933    /// ```rust
934    /// # #[cfg(feature = "deploy")] {
935    /// # use hydro_lang::prelude::*;
936    /// # use futures::StreamExt;
937    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
938    /// let tick = process.tick();
939    /// // ticks are lazy by default, forces the second tick to run
940    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
941    ///
942    /// let batch_first_tick = process
943    ///   .source_iter(q!(vec![1, 2, 3, 4]))
944    ///   .batch(&tick, nondet!(/** test */));
945    /// let batch_second_tick = process
946    ///   .source_iter(q!(vec![5, 6, 7, 8]))
947    ///   .batch(&tick, nondet!(/** test */))
948    ///   .defer_tick(); // appears on the second tick
949    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
950    /// batch_first_tick.chain(batch_second_tick)
951    ///   .filter_if_none(some_on_first_tick)
952    ///   .all_ticks()
953    /// # }, |mut stream| async move {
954    /// // [5, 6, 7, 8]
955    /// # for w in vec![5, 6, 7, 8] {
956    /// #     assert_eq!(stream.next().await.unwrap(), w);
957    /// # }
958    /// # }));
959    /// # }
960    /// ```
961    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
962    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
963        self.filter_if(other.is_none())
964    }
965
966    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
967    /// tupled pairs in a non-deterministic order.
968    ///
969    /// # Example
970    /// ```rust
971    /// # #[cfg(feature = "deploy")] {
972    /// # use hydro_lang::prelude::*;
973    /// # use std::collections::HashSet;
974    /// # use futures::StreamExt;
975    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
976    /// let tick = process.tick();
977    /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
978    /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
979    /// stream1.cross_product(stream2)
980    /// # }, |mut stream| async move {
981    /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
982    /// # stream.map(|i| assert!(expected.contains(&i)));
983    /// # }));
984    /// # }
985    /// ```
986    pub fn cross_product<T2, O2: Ordering>(
987        self,
988        other: Stream<T2, L, B, O2, R>,
989    ) -> Stream<(T, T2), L, B, NoOrder, R>
990    where
991        T: Clone,
992        T2: Clone,
993    {
994        check_matching_location(&self.location, &other.location);
995
996        Stream::new(
997            self.location.clone(),
998            HydroNode::CrossProduct {
999                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1000                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1001                metadata: self
1002                    .location
1003                    .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
1004            },
1005        )
1006    }
1007
1008    /// Takes one stream as input and filters out any duplicate occurrences. The output
1009    /// contains all unique values from the input.
1010    ///
1011    /// # Example
1012    /// ```rust
1013    /// # #[cfg(feature = "deploy")] {
1014    /// # use hydro_lang::prelude::*;
1015    /// # use futures::StreamExt;
1016    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1017    /// let tick = process.tick();
1018    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
1019    /// # }, |mut stream| async move {
1020    /// # for w in vec![1, 2, 3, 4] {
1021    /// #     assert_eq!(stream.next().await.unwrap(), w);
1022    /// # }
1023    /// # }));
1024    /// # }
1025    /// ```
1026    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
1027    where
1028        T: Eq + Hash,
1029    {
1030        Stream::new(
1031            self.location.clone(),
1032            HydroNode::Unique {
1033                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1034                metadata: self
1035                    .location
1036                    .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1037            },
1038        )
1039    }
1040
1041    /// Outputs everything in this stream that is *not* contained in the `other` stream.
1042    ///
1043    /// The `other` stream must be [`Bounded`], since this function will wait until
1044    /// all its elements are available before producing any output.
1045    /// # Example
1046    /// ```rust
1047    /// # #[cfg(feature = "deploy")] {
1048    /// # use hydro_lang::prelude::*;
1049    /// # use futures::StreamExt;
1050    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1051    /// let tick = process.tick();
1052    /// let stream = process
1053    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
1054    ///   .batch(&tick, nondet!(/** test */));
1055    /// let batch = process
1056    ///   .source_iter(q!(vec![1, 2]))
1057    ///   .batch(&tick, nondet!(/** test */));
1058    /// stream.filter_not_in(batch).all_ticks()
1059    /// # }, |mut stream| async move {
1060    /// # for w in vec![3, 4] {
1061    /// #     assert_eq!(stream.next().await.unwrap(), w);
1062    /// # }
1063    /// # }));
1064    /// # }
1065    /// ```
1066    pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1067    where
1068        T: Eq + Hash,
1069        B2: IsBounded,
1070    {
1071        check_matching_location(&self.location, &other.location);
1072
1073        Stream::new(
1074            self.location.clone(),
1075            HydroNode::Difference {
1076                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1077                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1078                metadata: self
1079                    .location
1080                    .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1081            },
1082        )
1083    }
1084
1085    /// An operator which allows you to "inspect" each element of a stream without
1086    /// modifying it. The closure `f` is called on a reference to each item. This is
1087    /// mainly useful for debugging, and should not be used to generate side-effects.
1088    ///
1089    /// # Example
1090    /// ```rust
1091    /// # #[cfg(feature = "deploy")] {
1092    /// # use hydro_lang::prelude::*;
1093    /// # use futures::StreamExt;
1094    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1095    /// let nums = process.source_iter(q!(vec![1, 2]));
1096    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
1097    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
1098    /// # }, |mut stream| async move {
1099    /// # for w in vec![1, 2] {
1100    /// #     assert_eq!(stream.next().await.unwrap(), w);
1101    /// # }
1102    /// # }));
1103    /// # }
1104    /// ```
1105    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1106    where
1107        F: Fn(&T) + 'a,
1108    {
1109        let f = f.splice_fn1_borrow_ctx(&self.location).into();
1110
1111        Stream::new(
1112            self.location.clone(),
1113            HydroNode::Inspect {
1114                f,
1115                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1116                metadata: self.location.new_node_metadata(Self::collection_kind()),
1117            },
1118        )
1119    }
1120
1121    /// Executes the provided closure for every element in this stream.
1122    ///
1123    /// Because the closure may have side effects, the stream must have deterministic order
1124    /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
1125    /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
1126    /// [`Stream::assume_retries`] with an explanation for why this is the case.
1127    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
1128    where
1129        O: IsOrdered,
1130        R: IsExactlyOnce,
1131    {
1132        let f = f.splice_fn1_ctx(&self.location).into();
1133        self.location
1134            .flow_state()
1135            .borrow_mut()
1136            .push_root(HydroRoot::ForEach {
1137                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1138                f,
1139                op_metadata: HydroIrOpMetadata::new(),
1140            });
1141    }
1142
1143    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
1144    /// TCP socket to some other server. You should _not_ use this API for interacting with
1145    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
1146    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
1147    /// interaction with asynchronous sinks.
1148    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1149    where
1150        O: IsOrdered,
1151        R: IsExactlyOnce,
1152        S: 'a + futures::Sink<T> + Unpin,
1153    {
1154        self.location
1155            .flow_state()
1156            .borrow_mut()
1157            .push_root(HydroRoot::DestSink {
1158                sink: sink.splice_typed_ctx(&self.location).into(),
1159                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1160                op_metadata: HydroIrOpMetadata::new(),
1161            });
1162    }
1163
1164    /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1165    ///
1166    /// # Example
1167    /// ```rust
1168    /// # #[cfg(feature = "deploy")] {
1169    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1170    /// # use futures::StreamExt;
1171    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1172    /// let tick = process.tick();
1173    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1174    /// numbers.enumerate()
1175    /// # }, |mut stream| async move {
1176    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1177    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1178    /// #     assert_eq!(stream.next().await.unwrap(), w);
1179    /// # }
1180    /// # }));
1181    /// # }
1182    /// ```
1183    pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1184    where
1185        O: IsOrdered,
1186        R: IsExactlyOnce,
1187    {
1188        Stream::new(
1189            self.location.clone(),
1190            HydroNode::Enumerate {
1191                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1192                metadata: self.location.new_node_metadata(Stream::<
1193                    (usize, T),
1194                    L,
1195                    B,
1196                    TotalOrder,
1197                    ExactlyOnce,
1198                >::collection_kind()),
1199            },
1200        )
1201    }
1202
1203    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1204    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1205    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1206    ///
1207    /// Depending on the input stream guarantees, the closure may need to be commutative
1208    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1209    ///
1210    /// # Example
1211    /// ```rust
1212    /// # #[cfg(feature = "deploy")] {
1213    /// # use hydro_lang::prelude::*;
1214    /// # use futures::StreamExt;
1215    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1216    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1217    /// words
1218    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1219    ///     .into_stream()
1220    /// # }, |mut stream| async move {
1221    /// // "HELLOWORLD"
1222    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1223    /// # }));
1224    /// # }
1225    /// ```
1226    pub fn fold<A, I, F, C, Idemp, M, B2: SingletonBound>(
1227        self,
1228        init: impl IntoQuotedMut<'a, I, L>,
1229        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1230    ) -> Singleton<A, L, B2>
1231    where
1232        I: Fn() -> A + 'a,
1233        F: Fn(&mut A, T),
1234        C: ValidCommutativityFor<O>,
1235        Idemp: ValidIdempotenceFor<R>,
1236        B: ApplyMonotoneStream<M, B2>,
1237    {
1238        let init = init.splice_fn0_ctx(&self.location).into();
1239        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1240        proof.register_proof(&comb);
1241
1242        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1243        let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1244
1245        let core = HydroNode::Fold {
1246            init,
1247            acc: comb.into(),
1248            input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1249            metadata: ordered_etc
1250                .location
1251                .new_node_metadata(Singleton::<A, L, B2>::collection_kind()),
1252        };
1253
1254        Singleton::new(ordered_etc.location.clone(), core)
1255    }
1256
1257    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1258    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1259    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1260    /// reference, so that it can be modified in place.
1261    ///
1262    /// Depending on the input stream guarantees, the closure may need to be commutative
1263    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1264    ///
1265    /// # Example
1266    /// ```rust
1267    /// # #[cfg(feature = "deploy")] {
1268    /// # use hydro_lang::prelude::*;
1269    /// # use futures::StreamExt;
1270    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1271    /// let bools = process.source_iter(q!(vec![false, true, false]));
1272    /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1273    /// # }, |mut stream| async move {
1274    /// // true
1275    /// # assert_eq!(stream.next().await.unwrap(), true);
1276    /// # }));
1277    /// # }
1278    /// ```
1279    pub fn reduce<F, C, Idemp>(
1280        self,
1281        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1282    ) -> Optional<T, L, B>
1283    where
1284        F: Fn(&mut T, T) + 'a,
1285        C: ValidCommutativityFor<O>,
1286        Idemp: ValidIdempotenceFor<R>,
1287    {
1288        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1289        proof.register_proof(&f);
1290
1291        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1292        let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1293
1294        let core = HydroNode::Reduce {
1295            f: f.into(),
1296            input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1297            metadata: ordered_etc
1298                .location
1299                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1300        };
1301
1302        Optional::new(ordered_etc.location.clone(), core)
1303    }
1304
1305    /// Computes the maximum element in the stream as an [`Optional`], which
1306    /// will be empty until the first element in the input arrives.
1307    ///
1308    /// # Example
1309    /// ```rust
1310    /// # #[cfg(feature = "deploy")] {
1311    /// # use hydro_lang::prelude::*;
1312    /// # use futures::StreamExt;
1313    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1314    /// let tick = process.tick();
1315    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1316    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1317    /// batch.max().all_ticks()
1318    /// # }, |mut stream| async move {
1319    /// // 4
1320    /// # assert_eq!(stream.next().await.unwrap(), 4);
1321    /// # }));
1322    /// # }
1323    /// ```
1324    pub fn max(self) -> Optional<T, L, B>
1325    where
1326        T: Ord,
1327    {
1328        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1329            .assume_ordering_trusted_bounded::<TotalOrder>(
1330                nondet!(/** max is commutative, but order affects intermediates */),
1331            )
1332            .reduce(q!(|curr, new| {
1333                if new > *curr {
1334                    *curr = new;
1335                }
1336            }))
1337    }
1338
1339    /// Computes the minimum element in the stream as an [`Optional`], which
1340    /// will be empty until the first element in the input arrives.
1341    ///
1342    /// # Example
1343    /// ```rust
1344    /// # #[cfg(feature = "deploy")] {
1345    /// # use hydro_lang::prelude::*;
1346    /// # use futures::StreamExt;
1347    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1348    /// let tick = process.tick();
1349    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1350    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1351    /// batch.min().all_ticks()
1352    /// # }, |mut stream| async move {
1353    /// // 1
1354    /// # assert_eq!(stream.next().await.unwrap(), 1);
1355    /// # }));
1356    /// # }
1357    /// ```
1358    pub fn min(self) -> Optional<T, L, B>
1359    where
1360        T: Ord,
1361    {
1362        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1363            .assume_ordering_trusted_bounded::<TotalOrder>(
1364                nondet!(/** max is commutative, but order affects intermediates */),
1365            )
1366            .reduce(q!(|curr, new| {
1367                if new < *curr {
1368                    *curr = new;
1369                }
1370            }))
1371    }
1372
1373    /// Computes the first element in the stream as an [`Optional`], which
1374    /// will be empty until the first element in the input arrives.
1375    ///
1376    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1377    /// re-ordering of elements may cause the first element to change.
1378    ///
1379    /// # Example
1380    /// ```rust
1381    /// # #[cfg(feature = "deploy")] {
1382    /// # use hydro_lang::prelude::*;
1383    /// # use futures::StreamExt;
1384    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1385    /// let tick = process.tick();
1386    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1387    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1388    /// batch.first().all_ticks()
1389    /// # }, |mut stream| async move {
1390    /// // 1
1391    /// # assert_eq!(stream.next().await.unwrap(), 1);
1392    /// # }));
1393    /// # }
1394    /// ```
1395    pub fn first(self) -> Optional<T, L, B>
1396    where
1397        O: IsOrdered,
1398    {
1399        self.make_totally_ordered()
1400            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1401            .generator(q!(|| ()), q!(|_, item| Generate::Return(item)))
1402            .reduce(q!(|_, _| {}))
1403    }
1404
1405    /// Computes the last element in the stream as an [`Optional`], which
1406    /// will be empty until an element in the input arrives.
1407    ///
1408    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1409    /// re-ordering of elements may cause the last element to change.
1410    ///
1411    /// # Example
1412    /// ```rust
1413    /// # #[cfg(feature = "deploy")] {
1414    /// # use hydro_lang::prelude::*;
1415    /// # use futures::StreamExt;
1416    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1417    /// let tick = process.tick();
1418    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1419    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1420    /// batch.last().all_ticks()
1421    /// # }, |mut stream| async move {
1422    /// // 4
1423    /// # assert_eq!(stream.next().await.unwrap(), 4);
1424    /// # }));
1425    /// # }
1426    /// ```
1427    pub fn last(self) -> Optional<T, L, B>
1428    where
1429        O: IsOrdered,
1430    {
1431        self.make_totally_ordered()
1432            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1433            .reduce(q!(|curr, new| *curr = new))
1434    }
1435
1436    /// Returns a stream containing at most the first `n` elements of the input stream,
1437    /// preserving the original order. Similar to `LIMIT` in SQL.
1438    ///
1439    /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1440    /// retries, since the result depends on the order and cardinality of elements.
1441    ///
1442    /// # Example
1443    /// ```rust
1444    /// # #[cfg(feature = "deploy")] {
1445    /// # use hydro_lang::prelude::*;
1446    /// # use futures::StreamExt;
1447    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1448    /// let numbers = process.source_iter(q!(vec![10, 20, 30, 40, 50]));
1449    /// numbers.limit(q!(3))
1450    /// # }, |mut stream| async move {
1451    /// // 10, 20, 30
1452    /// # for w in vec![10, 20, 30] {
1453    /// #     assert_eq!(stream.next().await.unwrap(), w);
1454    /// # }
1455    /// # }));
1456    /// # }
1457    /// ```
1458    pub fn limit(
1459        self,
1460        n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1461    ) -> Stream<T, L, B, TotalOrder, ExactlyOnce>
1462    where
1463        O: IsOrdered,
1464        R: IsExactlyOnce,
1465    {
1466        self.generator(
1467            q!(|| 0usize),
1468            q!(move |count, item| {
1469                if *count == n {
1470                    Generate::Break
1471                } else {
1472                    *count += 1;
1473                    if *count == n {
1474                        Generate::Return(item)
1475                    } else {
1476                        Generate::Yield(item)
1477                    }
1478                }
1479            }),
1480        )
1481    }
1482
1483    /// Collects all the elements of this stream into a single [`Vec`] element.
1484    ///
1485    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1486    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1487    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1488    /// the vector at an arbitrary point in time.
1489    ///
1490    /// # Example
1491    /// ```rust
1492    /// # #[cfg(feature = "deploy")] {
1493    /// # use hydro_lang::prelude::*;
1494    /// # use futures::StreamExt;
1495    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1496    /// let tick = process.tick();
1497    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1498    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1499    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1500    /// # }, |mut stream| async move {
1501    /// // [ vec![1, 2, 3, 4] ]
1502    /// # for w in vec![vec![1, 2, 3, 4]] {
1503    /// #     assert_eq!(stream.next().await.unwrap(), w);
1504    /// # }
1505    /// # }));
1506    /// # }
1507    /// ```
1508    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1509    where
1510        O: IsOrdered,
1511        R: IsExactlyOnce,
1512    {
1513        self.make_totally_ordered().make_exactly_once().fold(
1514            q!(|| vec![]),
1515            q!(|acc, v| {
1516                acc.push(v);
1517            }),
1518        )
1519    }
1520
1521    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1522    /// and emitting each intermediate result.
1523    ///
1524    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1525    /// containing all intermediate accumulated values. The scan operation can also terminate early
1526    /// by returning `None`.
1527    ///
1528    /// The function takes a mutable reference to the accumulator and the current element, and returns
1529    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1530    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1531    ///
1532    /// # Examples
1533    ///
1534    /// Basic usage - running sum:
1535    /// ```rust
1536    /// # #[cfg(feature = "deploy")] {
1537    /// # use hydro_lang::prelude::*;
1538    /// # use futures::StreamExt;
1539    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1540    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1541    ///     q!(|| 0),
1542    ///     q!(|acc, x| {
1543    ///         *acc += x;
1544    ///         Some(*acc)
1545    ///     }),
1546    /// )
1547    /// # }, |mut stream| async move {
1548    /// // Output: 1, 3, 6, 10
1549    /// # for w in vec![1, 3, 6, 10] {
1550    /// #     assert_eq!(stream.next().await.unwrap(), w);
1551    /// # }
1552    /// # }));
1553    /// # }
1554    /// ```
1555    ///
1556    /// Early termination example:
1557    /// ```rust
1558    /// # #[cfg(feature = "deploy")] {
1559    /// # use hydro_lang::prelude::*;
1560    /// # use futures::StreamExt;
1561    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1562    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1563    ///     q!(|| 1),
1564    ///     q!(|state, x| {
1565    ///         *state = *state * x;
1566    ///         if *state > 6 {
1567    ///             None // Terminate the stream
1568    ///         } else {
1569    ///             Some(-*state)
1570    ///         }
1571    ///     }),
1572    /// )
1573    /// # }, |mut stream| async move {
1574    /// // Output: -1, -2, -6
1575    /// # for w in vec![-1, -2, -6] {
1576    /// #     assert_eq!(stream.next().await.unwrap(), w);
1577    /// # }
1578    /// # }));
1579    /// # }
1580    /// ```
1581    pub fn scan<A, U, I, F>(
1582        self,
1583        init: impl IntoQuotedMut<'a, I, L>,
1584        f: impl IntoQuotedMut<'a, F, L>,
1585    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1586    where
1587        O: IsOrdered,
1588        R: IsExactlyOnce,
1589        I: Fn() -> A + 'a,
1590        F: Fn(&mut A, T) -> Option<U> + 'a,
1591    {
1592        let init = init.splice_fn0_ctx(&self.location).into();
1593        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1594
1595        Stream::new(
1596            self.location.clone(),
1597            HydroNode::Scan {
1598                init,
1599                acc: f,
1600                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1601                metadata: self.location.new_node_metadata(
1602                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1603                ),
1604            },
1605        )
1606    }
1607
1608    /// Iteratively processes the elements of the stream using a state machine that can yield
1609    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1610    /// syntax in Rust, without requiring special syntax.
1611    ///
1612    /// Like [`Stream::scan`], this function takes in an initializer that emits the initial
1613    /// state. The second argument defines the processing logic, taking in a mutable reference
1614    /// to the state and the value to be processed. It emits a [`Generate`] value, whose
1615    /// variants define what is emitted and whether further inputs should be processed.
1616    ///
1617    /// # Example
1618    /// ```rust
1619    /// # #[cfg(feature = "deploy")] {
1620    /// # use hydro_lang::prelude::*;
1621    /// # use futures::StreamExt;
1622    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1623    /// process.source_iter(q!(vec![1, 3, 100, 10])).generator(
1624    ///     q!(|| 0),
1625    ///     q!(|acc, x| {
1626    ///         *acc += x;
1627    ///         if *acc > 100 {
1628    ///             hydro_lang::live_collections::keyed_stream::Generate::Return("done!".to_owned())
1629    ///         } else if *acc % 2 == 0 {
1630    ///             hydro_lang::live_collections::keyed_stream::Generate::Yield("even".to_owned())
1631    ///         } else {
1632    ///             hydro_lang::live_collections::keyed_stream::Generate::Continue
1633    ///         }
1634    ///     }),
1635    /// )
1636    /// # }, |mut stream| async move {
1637    /// // Output: "even", "done!"
1638    /// # let mut results = Vec::new();
1639    /// # for _ in 0..2 {
1640    /// #     results.push(stream.next().await.unwrap());
1641    /// # }
1642    /// # results.sort();
1643    /// # assert_eq!(results, vec!["done!".to_owned(), "even".to_owned()]);
1644    /// # }));
1645    /// # }
1646    /// ```
1647    pub fn generator<A, U, I, F>(
1648        self,
1649        init: impl IntoQuotedMut<'a, I, L> + Copy,
1650        f: impl IntoQuotedMut<'a, F, L> + Copy,
1651    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1652    where
1653        O: IsOrdered,
1654        R: IsExactlyOnce,
1655        I: Fn() -> A + 'a,
1656        F: Fn(&mut A, T) -> Generate<U> + 'a,
1657    {
1658        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1659        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1660
1661        let this = self.make_totally_ordered().make_exactly_once();
1662
1663        // State is Option<Option<A>>:
1664        //   None = not yet initialized
1665        //   Some(Some(a)) = active with state a
1666        //   Some(None) = terminated
1667        let scan_init = q!(|| None)
1668            .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1669            .into();
1670        let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1671            if state.is_none() {
1672                *state = Some(Some(init()));
1673            }
1674            match state {
1675                Some(Some(state_value)) => match f(state_value, v) {
1676                    Generate::Yield(out) => Some(Some(out)),
1677                    Generate::Return(out) => {
1678                        *state = Some(None);
1679                        Some(Some(out))
1680                    }
1681                    // Unlike KeyedStream, we can terminate the scan directly on
1682                    // Break/Return because there is only one state (no other keys
1683                    // that still need processing).
1684                    Generate::Break => None,
1685                    Generate::Continue => Some(None),
1686                },
1687                // State is Some(None) after Return; terminate the scan.
1688                _ => None,
1689            }
1690        })
1691        .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1692        .into();
1693
1694        let scan_node = HydroNode::Scan {
1695            init: scan_init,
1696            acc: scan_f,
1697            input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1698            metadata: this.location.new_node_metadata(Stream::<
1699                Option<U>,
1700                L,
1701                B,
1702                TotalOrder,
1703                ExactlyOnce,
1704            >::collection_kind()),
1705        };
1706
1707        let flatten_f = q!(|d| d)
1708            .splice_fn1_ctx::<Option<U>, _>(&this.location)
1709            .into();
1710        let flatten_node = HydroNode::FlatMap {
1711            f: flatten_f,
1712            input: Box::new(scan_node),
1713            metadata: this
1714                .location
1715                .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1716        };
1717
1718        Stream::new(this.location.clone(), flatten_node)
1719    }
1720
1721    /// Given a time interval, returns a stream corresponding to samples taken from the
1722    /// stream roughly at that interval. The output will have elements in the same order
1723    /// as the input, but with arbitrary elements skipped between samples. There is also
1724    /// no guarantee on the exact timing of the samples.
1725    ///
1726    /// # Non-Determinism
1727    /// The output stream is non-deterministic in which elements are sampled, since this
1728    /// is controlled by a clock.
1729    pub fn sample_every(
1730        self,
1731        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1732        nondet: NonDet,
1733    ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
1734    where
1735        L: NoTick + NoAtomic,
1736    {
1737        let samples = self.location.source_interval(interval, nondet);
1738
1739        let tick = self.location.tick();
1740        self.batch(&tick, nondet)
1741            .filter_if(samples.batch(&tick, nondet).first().is_some())
1742            .all_ticks()
1743            .weaken_retries()
1744    }
1745
1746    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
1747    /// stream has not emitted a value since that duration.
1748    ///
1749    /// # Non-Determinism
1750    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1751    /// samples take place, timeouts may be non-deterministically generated or missed,
1752    /// and the notification of the timeout may be delayed as well. There is also no
1753    /// guarantee on how long the [`Optional`] will have a value after the timeout is
1754    /// detected based on when the next sample is taken.
1755    pub fn timeout(
1756        self,
1757        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
1758        nondet: NonDet,
1759    ) -> Optional<(), L, Unbounded>
1760    where
1761        L: NoTick + NoAtomic,
1762    {
1763        let tick = self.location.tick();
1764
1765        let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
1766            q!(|| None),
1767            q!(
1768                |latest, _| {
1769                    *latest = Some(Instant::now());
1770                },
1771                commutative = manual_proof!(/** TODO */)
1772            ),
1773        );
1774
1775        latest_received
1776            .snapshot(&tick, nondet)
1777            .filter_map(q!(move |latest_received| {
1778                if let Some(latest_received) = latest_received {
1779                    if Instant::now().duration_since(latest_received) > duration {
1780                        Some(())
1781                    } else {
1782                        None
1783                    }
1784                } else {
1785                    Some(())
1786                }
1787            }))
1788            .latest()
1789    }
1790
1791    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
1792    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
1793    ///
1794    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1795    /// processed before an acknowledgement is emitted.
1796    pub fn atomic(self) -> Stream<T, Atomic<L>, B, O, R> {
1797        let id = self.location.flow_state().borrow_mut().next_clock_id();
1798        let out_location = Atomic {
1799            tick: Tick {
1800                id,
1801                l: self.location.clone(),
1802            },
1803        };
1804        Stream::new(
1805            out_location.clone(),
1806            HydroNode::BeginAtomic {
1807                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1808                metadata: out_location
1809                    .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
1810            },
1811        )
1812    }
1813
1814    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1815    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1816    /// the order of the input. The output stream will execute in the [`Tick`] that was
1817    /// used to create the atomic section.
1818    ///
1819    /// # Non-Determinism
1820    /// The batch boundaries are non-deterministic and may change across executions.
1821    pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
1822        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1823        Stream::new(
1824            tick.clone(),
1825            HydroNode::Batch {
1826                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1827                metadata: tick
1828                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
1829            },
1830        )
1831    }
1832
1833    /// An operator which allows you to "name" a `HydroNode`.
1834    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1835    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
1836        {
1837            let mut node = self.ir_node.borrow_mut();
1838            let metadata = node.metadata_mut();
1839            metadata.tag = Some(name.to_owned());
1840        }
1841        self
1842    }
1843
1844    /// Explicitly "casts" the stream to a type with a different ordering
1845    /// guarantee. Useful in unsafe code where the ordering cannot be proven
1846    /// by the type-system.
1847    ///
1848    /// # Non-Determinism
1849    /// This function is used as an escape hatch, and any mistakes in the
1850    /// provided ordering guarantee will propagate into the guarantees
1851    /// for the rest of the program.
1852    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
1853        if O::ORDERING_KIND == O2::ORDERING_KIND {
1854            Stream::new(
1855                self.location.clone(),
1856                self.ir_node.replace(HydroNode::Placeholder),
1857            )
1858        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1859            // We can always weaken the ordering guarantee
1860            Stream::new(
1861                self.location.clone(),
1862                HydroNode::Cast {
1863                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1864                    metadata: self
1865                        .location
1866                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1867                },
1868            )
1869        } else {
1870            Stream::new(
1871                self.location.clone(),
1872                HydroNode::ObserveNonDet {
1873                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1874                    trusted: false,
1875                    metadata: self
1876                        .location
1877                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1878                },
1879            )
1880        }
1881    }
1882
1883    // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
1884    // intermediate states will not be revealed
1885    fn assume_ordering_trusted_bounded<O2: Ordering>(
1886        self,
1887        nondet: NonDet,
1888    ) -> Stream<T, L, B, O2, R> {
1889        if B::BOUNDED {
1890            self.assume_ordering_trusted(nondet)
1891        } else {
1892            self.assume_ordering(nondet)
1893        }
1894    }
1895
1896    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1897    // is not observable
1898    pub(crate) fn assume_ordering_trusted<O2: Ordering>(
1899        self,
1900        _nondet: NonDet,
1901    ) -> Stream<T, L, B, O2, R> {
1902        if O::ORDERING_KIND == O2::ORDERING_KIND {
1903            Stream::new(
1904                self.location.clone(),
1905                self.ir_node.replace(HydroNode::Placeholder),
1906            )
1907        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1908            // We can always weaken the ordering guarantee
1909            Stream::new(
1910                self.location.clone(),
1911                HydroNode::Cast {
1912                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1913                    metadata: self
1914                        .location
1915                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1916                },
1917            )
1918        } else {
1919            Stream::new(
1920                self.location.clone(),
1921                HydroNode::ObserveNonDet {
1922                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1923                    trusted: true,
1924                    metadata: self
1925                        .location
1926                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1927                },
1928            )
1929        }
1930    }
1931
1932    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
1933    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
1934    /// which is always safe because that is the weakest possible guarantee.
1935    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
1936        self.weaken_ordering::<NoOrder>()
1937    }
1938
1939    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
1940    /// enforcing that `O2` is weaker than the input ordering guarantee.
1941    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
1942        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
1943        self.assume_ordering::<O2>(nondet)
1944    }
1945
1946    /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
1947    /// implies that `O == TotalOrder`.
1948    pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
1949    where
1950        O: IsOrdered,
1951    {
1952        self.assume_ordering(nondet!(/** no-op */))
1953    }
1954
1955    /// Explicitly "casts" the stream to a type with a different retries
1956    /// guarantee. Useful in unsafe code where the lack of retries cannot
1957    /// be proven by the type-system.
1958    ///
1959    /// # Non-Determinism
1960    /// This function is used as an escape hatch, and any mistakes in the
1961    /// provided retries guarantee will propagate into the guarantees
1962    /// for the rest of the program.
1963    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1964        if R::RETRIES_KIND == R2::RETRIES_KIND {
1965            Stream::new(
1966                self.location.clone(),
1967                self.ir_node.replace(HydroNode::Placeholder),
1968            )
1969        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1970            // We can always weaken the retries guarantee
1971            Stream::new(
1972                self.location.clone(),
1973                HydroNode::Cast {
1974                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1975                    metadata: self
1976                        .location
1977                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1978                },
1979            )
1980        } else {
1981            Stream::new(
1982                self.location.clone(),
1983                HydroNode::ObserveNonDet {
1984                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1985                    trusted: false,
1986                    metadata: self
1987                        .location
1988                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1989                },
1990            )
1991        }
1992    }
1993
1994    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1995    // is not observable
1996    fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1997        if R::RETRIES_KIND == R2::RETRIES_KIND {
1998            Stream::new(
1999                self.location.clone(),
2000                self.ir_node.replace(HydroNode::Placeholder),
2001            )
2002        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2003            // We can always weaken the retries guarantee
2004            Stream::new(
2005                self.location.clone(),
2006                HydroNode::Cast {
2007                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2008                    metadata: self
2009                        .location
2010                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2011                },
2012            )
2013        } else {
2014            Stream::new(
2015                self.location.clone(),
2016                HydroNode::ObserveNonDet {
2017                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2018                    trusted: true,
2019                    metadata: self
2020                        .location
2021                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2022                },
2023            )
2024        }
2025    }
2026
2027    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
2028    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
2029    /// which is always safe because that is the weakest possible guarantee.
2030    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
2031        self.weaken_retries::<AtLeastOnce>()
2032    }
2033
2034    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
2035    /// enforcing that `R2` is weaker than the input retries guarantee.
2036    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
2037        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
2038        self.assume_retries::<R2>(nondet)
2039    }
2040
2041    /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
2042    /// implies that `R == ExactlyOnce`.
2043    pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
2044    where
2045        R: IsExactlyOnce,
2046    {
2047        self.assume_retries(nondet!(/** no-op */))
2048    }
2049
2050    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
2051    /// implies that `B == Bounded`.
2052    pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
2053    where
2054        B: IsBounded,
2055    {
2056        Stream::new(
2057            self.location.clone(),
2058            self.ir_node.replace(HydroNode::Placeholder),
2059        )
2060    }
2061}
2062
2063impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
2064where
2065    L: Location<'a>,
2066{
2067    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
2068    ///
2069    /// # Example
2070    /// ```rust
2071    /// # #[cfg(feature = "deploy")] {
2072    /// # use hydro_lang::prelude::*;
2073    /// # use futures::StreamExt;
2074    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2075    /// process.source_iter(q!(&[1, 2, 3])).cloned()
2076    /// # }, |mut stream| async move {
2077    /// // 1, 2, 3
2078    /// # for w in vec![1, 2, 3] {
2079    /// #     assert_eq!(stream.next().await.unwrap(), w);
2080    /// # }
2081    /// # }));
2082    /// # }
2083    /// ```
2084    pub fn cloned(self) -> Stream<T, L, B, O, R>
2085    where
2086        T: Clone,
2087    {
2088        self.map(q!(|d| d.clone()))
2089    }
2090}
2091
2092impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2093where
2094    L: Location<'a>,
2095{
2096    /// Computes the number of elements in the stream as a [`Singleton`].
2097    ///
2098    /// # Example
2099    /// ```rust
2100    /// # #[cfg(feature = "deploy")] {
2101    /// # use hydro_lang::prelude::*;
2102    /// # use futures::StreamExt;
2103    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2104    /// let tick = process.tick();
2105    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2106    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2107    /// batch.count().all_ticks()
2108    /// # }, |mut stream| async move {
2109    /// // 4
2110    /// # assert_eq!(stream.next().await.unwrap(), 4);
2111    /// # }));
2112    /// # }
2113    /// ```
2114    pub fn count(self) -> Singleton<usize, L, B::StreamToMonotone> {
2115        self.assume_ordering_trusted::<TotalOrder>(nondet!(
2116            /// Order does not affect eventual count, and also does not affect intermediate states.
2117        ))
2118        .fold(
2119            q!(|| 0usize),
2120            q!(
2121                |count, _| *count += 1,
2122                monotone = manual_proof!(/** += 1 is monotone */)
2123            ),
2124        )
2125    }
2126}
2127
2128impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2129    /// Produces a new stream that merges the elements of the two input streams.
2130    /// The result has [`NoOrder`] because the order of merging is not guaranteed.
2131    ///
2132    /// Currently, both input streams must be [`Unbounded`]. When the streams are
2133    /// [`Bounded`], you can use [`Stream::chain`] instead.
2134    ///
2135    /// # Example
2136    /// ```rust
2137    /// # #[cfg(feature = "deploy")] {
2138    /// # use hydro_lang::prelude::*;
2139    /// # use futures::StreamExt;
2140    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2141    /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
2142    /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
2143    /// numbers.clone().map(q!(|x| x + 1)).merge_unordered(numbers)
2144    /// # }, |mut stream| async move {
2145    /// // 2, 3, 4, 5, and 1, 2, 3, 4 merged in unknown order
2146    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2147    /// #     assert_eq!(stream.next().await.unwrap(), w);
2148    /// # }
2149    /// # }));
2150    /// # }
2151    /// ```
2152    pub fn merge_unordered<O2: Ordering, R2: Retries>(
2153        self,
2154        other: Stream<T, L, Unbounded, O2, R2>,
2155    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2156    where
2157        R: MinRetries<R2>,
2158    {
2159        Stream::new(
2160            self.location.clone(),
2161            HydroNode::Chain {
2162                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2163                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2164                metadata: self.location.new_node_metadata(Stream::<
2165                    T,
2166                    L,
2167                    Unbounded,
2168                    NoOrder,
2169                    <R as MinRetries<R2>>::Min,
2170                >::collection_kind()),
2171            },
2172        )
2173    }
2174
2175    /// Deprecated: use [`Stream::merge_unordered`] instead.
2176    #[deprecated(note = "use `merge_unordered` instead")]
2177    pub fn interleave<O2: Ordering, R2: Retries>(
2178        self,
2179        other: Stream<T, L, Unbounded, O2, R2>,
2180    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2181    where
2182        R: MinRetries<R2>,
2183    {
2184        self.merge_unordered(other)
2185    }
2186}
2187
2188impl<'a, T, L: Location<'a> + NoTick, R: Retries> Stream<T, L, Unbounded, TotalOrder, R> {
2189    /// Produces a new stream that combines the elements of the two input streams,
2190    /// preserving the relative order of elements within each input.
2191    ///
2192    /// Currently, both input streams must be [`Unbounded`]. When the streams are
2193    /// [`Bounded`], you can use [`Stream::chain`] instead.
2194    ///
2195    /// # Non-Determinism
2196    /// The order in which elements *across* the two streams will be interleaved is
2197    /// non-deterministic, so the order of elements will vary across runs. If the output order
2198    /// is irrelevant, use [`Stream::merge_unordered`] instead, which is deterministic but emits an
2199    /// unordered stream.
2200    ///
2201    /// # Example
2202    /// ```rust
2203    /// # #[cfg(feature = "deploy")] {
2204    /// # use hydro_lang::prelude::*;
2205    /// # use futures::StreamExt;
2206    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2207    /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
2208    /// # process.source_iter(q!(vec![1, 3])).into();
2209    /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
2210    /// # }, |mut stream| async move {
2211    /// // 1, 3 and 2, 4 in some order, preserving the original local order
2212    /// # for w in vec![1, 3, 2, 4] {
2213    /// #     assert_eq!(stream.next().await.unwrap(), w);
2214    /// # }
2215    /// # }));
2216    /// # }
2217    /// ```
2218    pub fn merge_ordered<R2: Retries>(
2219        self,
2220        other: Stream<T, L, Unbounded, TotalOrder, R2>,
2221        _nondet: NonDet,
2222    ) -> Stream<T, L, Unbounded, TotalOrder, <R as MinRetries<R2>>::Min>
2223    where
2224        R: MinRetries<R2>,
2225    {
2226        Stream::new(
2227            self.location.clone(),
2228            HydroNode::Chain {
2229                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2230                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2231                metadata: self.location.new_node_metadata(Stream::<
2232                    T,
2233                    L,
2234                    Unbounded,
2235                    TotalOrder,
2236                    <R as MinRetries<R2>>::Min,
2237                >::collection_kind()),
2238            },
2239        )
2240    }
2241}
2242
2243impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2244where
2245    L: Location<'a>,
2246{
2247    /// Produces a new stream that emits the input elements in sorted order.
2248    ///
2249    /// The input stream can have any ordering guarantee, but the output stream
2250    /// will have a [`TotalOrder`] guarantee. This operator will block until all
2251    /// elements in the input stream are available, so it requires the input stream
2252    /// to be [`Bounded`].
2253    ///
2254    /// # Example
2255    /// ```rust
2256    /// # #[cfg(feature = "deploy")] {
2257    /// # use hydro_lang::prelude::*;
2258    /// # use futures::StreamExt;
2259    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2260    /// let tick = process.tick();
2261    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
2262    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2263    /// batch.sort().all_ticks()
2264    /// # }, |mut stream| async move {
2265    /// // 1, 2, 3, 4
2266    /// # for w in (1..5) {
2267    /// #     assert_eq!(stream.next().await.unwrap(), w);
2268    /// # }
2269    /// # }));
2270    /// # }
2271    /// ```
2272    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2273    where
2274        B: IsBounded,
2275        T: Ord,
2276    {
2277        let this = self.make_bounded();
2278        Stream::new(
2279            this.location.clone(),
2280            HydroNode::Sort {
2281                input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2282                metadata: this
2283                    .location
2284                    .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2285            },
2286        )
2287    }
2288
2289    /// Produces a new stream that first emits the elements of the `self` stream,
2290    /// and then emits the elements of the `other` stream. The output stream has
2291    /// a [`TotalOrder`] guarantee if and only if both input streams have a
2292    /// [`TotalOrder`] guarantee.
2293    ///
2294    /// Currently, both input streams must be [`Bounded`]. This operator will block
2295    /// on the first stream until all its elements are available. In a future version,
2296    /// we will relax the requirement on the `other` stream.
2297    ///
2298    /// # Example
2299    /// ```rust
2300    /// # #[cfg(feature = "deploy")] {
2301    /// # use hydro_lang::prelude::*;
2302    /// # use futures::StreamExt;
2303    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2304    /// let tick = process.tick();
2305    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2306    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2307    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2308    /// # }, |mut stream| async move {
2309    /// // 2, 3, 4, 5, 1, 2, 3, 4
2310    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2311    /// #     assert_eq!(stream.next().await.unwrap(), w);
2312    /// # }
2313    /// # }));
2314    /// # }
2315    /// ```
2316    pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2317        self,
2318        other: Stream<T, L, B2, O2, R2>,
2319    ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2320    where
2321        B: IsBounded,
2322        O: MinOrder<O2>,
2323        R: MinRetries<R2>,
2324    {
2325        check_matching_location(&self.location, &other.location);
2326
2327        Stream::new(
2328            self.location.clone(),
2329            HydroNode::Chain {
2330                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2331                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2332                metadata: self.location.new_node_metadata(Stream::<
2333                    T,
2334                    L,
2335                    B2,
2336                    <O as MinOrder<O2>>::Min,
2337                    <R as MinRetries<R2>>::Min,
2338                >::collection_kind()),
2339            },
2340        )
2341    }
2342
2343    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
2344    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
2345    /// because this is compiled into a nested loop.
2346    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
2347        self,
2348        other: Stream<T2, L, Bounded, O2, R>,
2349    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
2350    where
2351        B: IsBounded,
2352        T: Clone,
2353        T2: Clone,
2354    {
2355        let this = self.make_bounded();
2356        check_matching_location(&this.location, &other.location);
2357
2358        Stream::new(
2359            this.location.clone(),
2360            HydroNode::CrossProduct {
2361                left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2362                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2363                metadata: this.location.new_node_metadata(Stream::<
2364                    (T, T2),
2365                    L,
2366                    Bounded,
2367                    <O2 as MinOrder<O>>::Min,
2368                    R,
2369                >::collection_kind()),
2370            },
2371        )
2372    }
2373
2374    /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2375    /// `self` used as the values for *each* key.
2376    ///
2377    /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2378    /// values. For example, it can be used to send the same set of elements to several cluster
2379    /// members, if the membership information is available as a [`KeyedSingleton`].
2380    ///
2381    /// # Example
2382    /// ```rust
2383    /// # #[cfg(feature = "deploy")] {
2384    /// # use hydro_lang::prelude::*;
2385    /// # use futures::StreamExt;
2386    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2387    /// # let tick = process.tick();
2388    /// let keyed_singleton = // { 1: (), 2: () }
2389    /// # process
2390    /// #     .source_iter(q!(vec![(1, ()), (2, ())]))
2391    /// #     .into_keyed()
2392    /// #     .batch(&tick, nondet!(/** test */))
2393    /// #     .first();
2394    /// let stream = // [ "a", "b" ]
2395    /// # process
2396    /// #     .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2397    /// #     .batch(&tick, nondet!(/** test */));
2398    /// stream.repeat_with_keys(keyed_singleton)
2399    /// # .entries().all_ticks()
2400    /// # }, |mut stream| async move {
2401    /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2402    /// # let mut results = Vec::new();
2403    /// # for _ in 0..4 {
2404    /// #     results.push(stream.next().await.unwrap());
2405    /// # }
2406    /// # results.sort();
2407    /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2408    /// # }));
2409    /// # }
2410    /// ```
2411    pub fn repeat_with_keys<K, V2>(
2412        self,
2413        keys: KeyedSingleton<K, V2, L, Bounded>,
2414    ) -> KeyedStream<K, T, L, Bounded, O, R>
2415    where
2416        B: IsBounded,
2417        K: Clone,
2418        T: Clone,
2419    {
2420        keys.keys()
2421            .weaken_retries()
2422            .assume_ordering_trusted::<TotalOrder>(
2423                nondet!(/** keyed stream does not depend on ordering of keys */),
2424            )
2425            .cross_product_nested_loop(self.make_bounded())
2426            .into_keyed()
2427    }
2428
2429    /// Consumes a stream of `Future<T>`, resolving each future while blocking subgraph
2430    /// execution until all results are available. The output order is based on when futures
2431    /// complete, and may be different than the input order.
2432    ///
2433    /// Unlike [`Stream::resolve_futures`], which allows the subgraph to continue executing
2434    /// while futures are pending, this variant blocks until the futures resolve.
2435    ///
2436    /// # Example
2437    /// ```rust
2438    /// # #[cfg(feature = "deploy")] {
2439    /// # use std::collections::HashSet;
2440    /// # use futures::StreamExt;
2441    /// # use hydro_lang::prelude::*;
2442    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2443    /// process
2444    ///     .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2445    ///     .map(q!(|x| async move {
2446    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2447    ///         x
2448    ///     }))
2449    ///     .resolve_futures_blocking()
2450    /// #   },
2451    /// #   |mut stream| async move {
2452    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2453    /// #       let mut output = HashSet::new();
2454    /// #       for _ in 1..10 {
2455    /// #           output.insert(stream.next().await.unwrap());
2456    /// #       }
2457    /// #       assert_eq!(
2458    /// #           output,
2459    /// #           HashSet::<i32>::from_iter(1..10)
2460    /// #       );
2461    /// #   },
2462    /// # ));
2463    /// # }
2464    /// ```
2465    pub fn resolve_futures_blocking(self) -> Stream<T::Output, L, B, NoOrder, R>
2466    where
2467        T: Future,
2468    {
2469        Stream::new(
2470            self.location.clone(),
2471            HydroNode::ResolveFuturesBlocking {
2472                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2473                metadata: self
2474                    .location
2475                    .new_node_metadata(Stream::<T::Output, L, B, NoOrder, R>::collection_kind()),
2476            },
2477        )
2478    }
2479
2480    /// Returns a [`Singleton`] containing `true` if the stream has no elements, or `false` otherwise.
2481    ///
2482    /// # Example
2483    /// ```rust
2484    /// # #[cfg(feature = "deploy")] {
2485    /// # use hydro_lang::prelude::*;
2486    /// # use futures::StreamExt;
2487    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2488    /// let tick = process.tick();
2489    /// let empty: Stream<i32, _, Bounded> = process
2490    ///   .source_iter(q!(Vec::<i32>::new()))
2491    ///   .batch(&tick, nondet!(/** test */));
2492    /// empty.is_empty().all_ticks()
2493    /// # }, |mut stream| async move {
2494    /// // true
2495    /// # assert_eq!(stream.next().await.unwrap(), true);
2496    /// # }));
2497    /// # }
2498    /// ```
2499    #[expect(clippy::wrong_self_convention, reason = "stream function naming")]
2500    pub fn is_empty(self) -> Singleton<bool, L, Bounded>
2501    where
2502        B: IsBounded,
2503    {
2504        self.make_bounded()
2505            .assume_ordering_trusted::<TotalOrder>(
2506                nondet!(/** is_empty intermediates unaffected by order */),
2507            )
2508            .first()
2509            .is_none()
2510    }
2511}
2512
2513impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2514where
2515    L: Location<'a>,
2516{
2517    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2518    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2519    /// by equi-joining the two streams on the key attribute `K`.
2520    ///
2521    /// # Example
2522    /// ```rust
2523    /// # #[cfg(feature = "deploy")] {
2524    /// # use hydro_lang::prelude::*;
2525    /// # use std::collections::HashSet;
2526    /// # use futures::StreamExt;
2527    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2528    /// let tick = process.tick();
2529    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2530    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2531    /// stream1.join(stream2)
2532    /// # }, |mut stream| async move {
2533    /// // (1, ('a', 'x')), (2, ('b', 'y'))
2534    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2535    /// # stream.map(|i| assert!(expected.contains(&i)));
2536    /// # }));
2537    /// # }
2538    pub fn join<V2, O2: Ordering, R2: Retries>(
2539        self,
2540        n: Stream<(K, V2), L, B, O2, R2>,
2541    ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
2542    where
2543        K: Eq + Hash,
2544        R: MinRetries<R2>,
2545    {
2546        check_matching_location(&self.location, &n.location);
2547
2548        Stream::new(
2549            self.location.clone(),
2550            HydroNode::Join {
2551                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2552                right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2553                metadata: self.location.new_node_metadata(Stream::<
2554                    (K, (V1, V2)),
2555                    L,
2556                    B,
2557                    NoOrder,
2558                    <R as MinRetries<R2>>::Min,
2559                >::collection_kind()),
2560            },
2561        )
2562    }
2563
2564    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2565    /// computes the anti-join of the items in the input -- i.e. returns
2566    /// unique items in the first input that do not have a matching key
2567    /// in the second input.
2568    ///
2569    /// # Example
2570    /// ```rust
2571    /// # #[cfg(feature = "deploy")] {
2572    /// # use hydro_lang::prelude::*;
2573    /// # use futures::StreamExt;
2574    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2575    /// let tick = process.tick();
2576    /// let stream = process
2577    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2578    ///   .batch(&tick, nondet!(/** test */));
2579    /// let batch = process
2580    ///   .source_iter(q!(vec![1, 2]))
2581    ///   .batch(&tick, nondet!(/** test */));
2582    /// stream.anti_join(batch).all_ticks()
2583    /// # }, |mut stream| async move {
2584    /// # for w in vec![(3, 'c'), (4, 'd')] {
2585    /// #     assert_eq!(stream.next().await.unwrap(), w);
2586    /// # }
2587    /// # }));
2588    /// # }
2589    pub fn anti_join<O2: Ordering, R2: Retries>(
2590        self,
2591        n: Stream<K, L, Bounded, O2, R2>,
2592    ) -> Stream<(K, V1), L, B, O, R>
2593    where
2594        K: Eq + Hash,
2595    {
2596        check_matching_location(&self.location, &n.location);
2597
2598        Stream::new(
2599            self.location.clone(),
2600            HydroNode::AntiJoin {
2601                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2602                neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2603                metadata: self
2604                    .location
2605                    .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2606            },
2607        )
2608    }
2609}
2610
2611impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2612    Stream<(K, V), L, B, O, R>
2613{
2614    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2615    /// is used as the key and the second element is added to the entries associated with that key.
2616    ///
2617    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2618    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2619    /// performing grouped aggregations, but also for more precise ordering guarantees such as
2620    /// total ordering _within_ each group but no ordering _across_ groups.
2621    ///
2622    /// # Example
2623    /// ```rust
2624    /// # #[cfg(feature = "deploy")] {
2625    /// # use hydro_lang::prelude::*;
2626    /// # use futures::StreamExt;
2627    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2628    /// process
2629    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2630    ///     .into_keyed()
2631    /// #   .entries()
2632    /// # }, |mut stream| async move {
2633    /// // { 1: [2, 3], 2: [4] }
2634    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2635    /// #     assert_eq!(stream.next().await.unwrap(), w);
2636    /// # }
2637    /// # }));
2638    /// # }
2639    /// ```
2640    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2641        KeyedStream::new(
2642            self.location.clone(),
2643            HydroNode::Cast {
2644                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2645                metadata: self
2646                    .location
2647                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2648            },
2649        )
2650    }
2651}
2652
2653impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2654where
2655    K: Eq + Hash,
2656    L: Location<'a>,
2657{
2658    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2659    /// # Example
2660    /// ```rust
2661    /// # #[cfg(feature = "deploy")] {
2662    /// # use hydro_lang::prelude::*;
2663    /// # use futures::StreamExt;
2664    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2665    /// let tick = process.tick();
2666    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2667    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2668    /// batch.keys().all_ticks()
2669    /// # }, |mut stream| async move {
2670    /// // 1, 2
2671    /// # assert_eq!(stream.next().await.unwrap(), 1);
2672    /// # assert_eq!(stream.next().await.unwrap(), 2);
2673    /// # }));
2674    /// # }
2675    /// ```
2676    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2677        self.into_keyed()
2678            .fold(
2679                q!(|| ()),
2680                q!(
2681                    |_, _| {},
2682                    commutative = manual_proof!(/** values are ignored */),
2683                    idempotent = manual_proof!(/** values are ignored */)
2684                ),
2685            )
2686            .keys()
2687    }
2688}
2689
2690impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2691where
2692    L: Location<'a> + NoTick,
2693{
2694    /// Returns a stream corresponding to the latest batch of elements being atomically
2695    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2696    /// the order of the input.
2697    ///
2698    /// # Non-Determinism
2699    /// The batch boundaries are non-deterministic and may change across executions.
2700    pub fn batch_atomic(
2701        self,
2702        tick: &Tick<L>,
2703        _nondet: NonDet,
2704    ) -> Stream<T, Tick<L>, Bounded, O, R> {
2705        Stream::new(
2706            tick.clone(),
2707            HydroNode::Batch {
2708                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2709                metadata: tick
2710                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2711            },
2712        )
2713    }
2714
2715    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2716    /// See [`Stream::atomic`] for more details.
2717    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2718        Stream::new(
2719            self.location.tick.l.clone(),
2720            HydroNode::EndAtomic {
2721                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2722                metadata: self
2723                    .location
2724                    .tick
2725                    .l
2726                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2727            },
2728        )
2729    }
2730}
2731
2732impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2733where
2734    L: Location<'a> + NoTick + NoAtomic,
2735    F: Future<Output = T>,
2736{
2737    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2738    /// Future outputs are produced as available, regardless of input arrival order.
2739    ///
2740    /// # Example
2741    /// ```rust
2742    /// # #[cfg(feature = "deploy")] {
2743    /// # use std::collections::HashSet;
2744    /// # use futures::StreamExt;
2745    /// # use hydro_lang::prelude::*;
2746    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2747    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2748    ///     .map(q!(|x| async move {
2749    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2750    ///         x
2751    ///     }))
2752    ///     .resolve_futures()
2753    /// #   },
2754    /// #   |mut stream| async move {
2755    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2756    /// #       let mut output = HashSet::new();
2757    /// #       for _ in 1..10 {
2758    /// #           output.insert(stream.next().await.unwrap());
2759    /// #       }
2760    /// #       assert_eq!(
2761    /// #           output,
2762    /// #           HashSet::<i32>::from_iter(1..10)
2763    /// #       );
2764    /// #   },
2765    /// # ));
2766    /// # }
2767    pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
2768        Stream::new(
2769            self.location.clone(),
2770            HydroNode::ResolveFutures {
2771                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2772                metadata: self
2773                    .location
2774                    .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
2775            },
2776        )
2777    }
2778
2779    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2780    /// Future outputs are produced in the same order as the input stream.
2781    ///
2782    /// # Example
2783    /// ```rust
2784    /// # #[cfg(feature = "deploy")] {
2785    /// # use std::collections::HashSet;
2786    /// # use futures::StreamExt;
2787    /// # use hydro_lang::prelude::*;
2788    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2789    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2790    ///     .map(q!(|x| async move {
2791    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2792    ///         x
2793    ///     }))
2794    ///     .resolve_futures_ordered()
2795    /// #   },
2796    /// #   |mut stream| async move {
2797    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2798    /// #       let mut output = Vec::new();
2799    /// #       for _ in 1..10 {
2800    /// #           output.push(stream.next().await.unwrap());
2801    /// #       }
2802    /// #       assert_eq!(
2803    /// #           output,
2804    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2805    /// #       );
2806    /// #   },
2807    /// # ));
2808    /// # }
2809    pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
2810        Stream::new(
2811            self.location.clone(),
2812            HydroNode::ResolveFuturesOrdered {
2813                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2814                metadata: self
2815                    .location
2816                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2817            },
2818        )
2819    }
2820}
2821
2822impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2823where
2824    L: Location<'a>,
2825{
2826    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2827    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2828    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2829        Stream::new(
2830            self.location.outer().clone(),
2831            HydroNode::YieldConcat {
2832                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2833                metadata: self
2834                    .location
2835                    .outer()
2836                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2837            },
2838        )
2839    }
2840
2841    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2842    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2843    ///
2844    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2845    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2846    /// stream's [`Tick`] context.
2847    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2848        let out_location = Atomic {
2849            tick: self.location.clone(),
2850        };
2851
2852        Stream::new(
2853            out_location.clone(),
2854            HydroNode::YieldConcat {
2855                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2856                metadata: out_location
2857                    .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2858            },
2859        )
2860    }
2861
2862    /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
2863    /// such as `fold` retrain their memory across ticks rather than resetting across batches of
2864    /// input.
2865    ///
2866    /// This API is particularly useful for stateful computation on batches of data, such as
2867    /// maintaining an accumulated state that is up to date with the current batch.
2868    ///
2869    /// # Example
2870    /// ```rust
2871    /// # #[cfg(feature = "deploy")] {
2872    /// # use hydro_lang::prelude::*;
2873    /// # use futures::StreamExt;
2874    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2875    /// let tick = process.tick();
2876    /// # // ticks are lazy by default, forces the second tick to run
2877    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2878    /// # let batch_first_tick = process
2879    /// #   .source_iter(q!(vec![1, 2, 3, 4]))
2880    /// #  .batch(&tick, nondet!(/** test */));
2881    /// # let batch_second_tick = process
2882    /// #   .source_iter(q!(vec![5, 6, 7]))
2883    /// #   .batch(&tick, nondet!(/** test */))
2884    /// #   .defer_tick(); // appears on the second tick
2885    /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
2886    /// # batch_first_tick.chain(batch_second_tick).all_ticks();
2887    ///
2888    /// input.batch(&tick, nondet!(/** test */))
2889    ///     .across_ticks(|s| s.count()).all_ticks()
2890    /// # }, |mut stream| async move {
2891    /// // [4, 7]
2892    /// assert_eq!(stream.next().await.unwrap(), 4);
2893    /// assert_eq!(stream.next().await.unwrap(), 7);
2894    /// # }));
2895    /// # }
2896    /// ```
2897    pub fn across_ticks<Out: BatchAtomic>(
2898        self,
2899        thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
2900    ) -> Out::Batched {
2901        thunk(self.all_ticks_atomic()).batched_atomic()
2902    }
2903
2904    /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2905    /// always has the elements of `self` at tick `T - 1`.
2906    ///
2907    /// At tick `0`, the output stream is empty, since there is no previous tick.
2908    ///
2909    /// This operator enables stateful iterative processing with ticks, by sending data from one
2910    /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2911    ///
2912    /// # Example
2913    /// ```rust
2914    /// # #[cfg(feature = "deploy")] {
2915    /// # use hydro_lang::prelude::*;
2916    /// # use futures::StreamExt;
2917    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2918    /// let tick = process.tick();
2919    /// // ticks are lazy by default, forces the second tick to run
2920    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2921    ///
2922    /// let batch_first_tick = process
2923    ///   .source_iter(q!(vec![1, 2, 3, 4]))
2924    ///   .batch(&tick, nondet!(/** test */));
2925    /// let batch_second_tick = process
2926    ///   .source_iter(q!(vec![0, 3, 4, 5, 6]))
2927    ///   .batch(&tick, nondet!(/** test */))
2928    ///   .defer_tick(); // appears on the second tick
2929    /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2930    ///
2931    /// changes_across_ticks.clone().filter_not_in(
2932    ///     changes_across_ticks.defer_tick() // the elements from the previous tick
2933    /// ).all_ticks()
2934    /// # }, |mut stream| async move {
2935    /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2936    /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2937    /// #     assert_eq!(stream.next().await.unwrap(), w);
2938    /// # }
2939    /// # }));
2940    /// # }
2941    /// ```
2942    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2943        Stream::new(
2944            self.location.clone(),
2945            HydroNode::DeferTick {
2946                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2947                metadata: self
2948                    .location
2949                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2950            },
2951        )
2952    }
2953}
2954
2955#[cfg(test)]
2956mod tests {
2957    #[cfg(feature = "deploy")]
2958    use futures::{SinkExt, StreamExt};
2959    #[cfg(feature = "deploy")]
2960    use hydro_deploy::Deployment;
2961    #[cfg(feature = "deploy")]
2962    use serde::{Deserialize, Serialize};
2963    #[cfg(any(feature = "deploy", feature = "sim"))]
2964    use stageleft::q;
2965
2966    #[cfg(any(feature = "deploy", feature = "sim"))]
2967    use crate::compile::builder::FlowBuilder;
2968    #[cfg(feature = "deploy")]
2969    use crate::live_collections::sliced::sliced;
2970    #[cfg(feature = "deploy")]
2971    use crate::live_collections::stream::ExactlyOnce;
2972    #[cfg(feature = "sim")]
2973    use crate::live_collections::stream::NoOrder;
2974    #[cfg(any(feature = "deploy", feature = "sim"))]
2975    use crate::live_collections::stream::TotalOrder;
2976    #[cfg(any(feature = "deploy", feature = "sim"))]
2977    use crate::location::Location;
2978    #[cfg(feature = "sim")]
2979    use crate::networking::TCP;
2980    #[cfg(any(feature = "deploy", feature = "sim"))]
2981    use crate::nondet::nondet;
2982
2983    mod backtrace_chained_ops;
2984
2985    #[cfg(feature = "deploy")]
2986    struct P1 {}
2987    #[cfg(feature = "deploy")]
2988    struct P2 {}
2989
2990    #[cfg(feature = "deploy")]
2991    #[derive(Serialize, Deserialize, Debug)]
2992    struct SendOverNetwork {
2993        n: u32,
2994    }
2995
2996    #[cfg(feature = "deploy")]
2997    #[tokio::test]
2998    async fn first_ten_distributed() {
2999        use crate::networking::TCP;
3000
3001        let mut deployment = Deployment::new();
3002
3003        let mut flow = FlowBuilder::new();
3004        let first_node = flow.process::<P1>();
3005        let second_node = flow.process::<P2>();
3006        let external = flow.external::<P2>();
3007
3008        let numbers = first_node.source_iter(q!(0..10));
3009        let out_port = numbers
3010            .map(q!(|n| SendOverNetwork { n }))
3011            .send(&second_node, TCP.fail_stop().bincode())
3012            .send_bincode_external(&external);
3013
3014        let nodes = flow
3015            .with_process(&first_node, deployment.Localhost())
3016            .with_process(&second_node, deployment.Localhost())
3017            .with_external(&external, deployment.Localhost())
3018            .deploy(&mut deployment);
3019
3020        deployment.deploy().await.unwrap();
3021
3022        let mut external_out = nodes.connect(out_port).await;
3023
3024        deployment.start().await.unwrap();
3025
3026        for i in 0..10 {
3027            assert_eq!(external_out.next().await.unwrap().n, i);
3028        }
3029    }
3030
3031    #[cfg(feature = "deploy")]
3032    #[tokio::test]
3033    async fn first_cardinality() {
3034        let mut deployment = Deployment::new();
3035
3036        let mut flow = FlowBuilder::new();
3037        let node = flow.process::<()>();
3038        let external = flow.external::<()>();
3039
3040        let node_tick = node.tick();
3041        let count = node_tick
3042            .singleton(q!([1, 2, 3]))
3043            .into_stream()
3044            .flatten_ordered()
3045            .first()
3046            .into_stream()
3047            .count()
3048            .all_ticks()
3049            .send_bincode_external(&external);
3050
3051        let nodes = flow
3052            .with_process(&node, deployment.Localhost())
3053            .with_external(&external, deployment.Localhost())
3054            .deploy(&mut deployment);
3055
3056        deployment.deploy().await.unwrap();
3057
3058        let mut external_out = nodes.connect(count).await;
3059
3060        deployment.start().await.unwrap();
3061
3062        assert_eq!(external_out.next().await.unwrap(), 1);
3063    }
3064
3065    #[cfg(feature = "deploy")]
3066    #[tokio::test]
3067    async fn unbounded_reduce_remembers_state() {
3068        let mut deployment = Deployment::new();
3069
3070        let mut flow = FlowBuilder::new();
3071        let node = flow.process::<()>();
3072        let external = flow.external::<()>();
3073
3074        let (input_port, input) = node.source_external_bincode(&external);
3075        let out = input
3076            .reduce(q!(|acc, v| *acc += v))
3077            .sample_eager(nondet!(/** test */))
3078            .send_bincode_external(&external);
3079
3080        let nodes = flow
3081            .with_process(&node, deployment.Localhost())
3082            .with_external(&external, deployment.Localhost())
3083            .deploy(&mut deployment);
3084
3085        deployment.deploy().await.unwrap();
3086
3087        let mut external_in = nodes.connect(input_port).await;
3088        let mut external_out = nodes.connect(out).await;
3089
3090        deployment.start().await.unwrap();
3091
3092        external_in.send(1).await.unwrap();
3093        assert_eq!(external_out.next().await.unwrap(), 1);
3094
3095        external_in.send(2).await.unwrap();
3096        assert_eq!(external_out.next().await.unwrap(), 3);
3097    }
3098
3099    #[cfg(feature = "deploy")]
3100    #[tokio::test]
3101    async fn top_level_bounded_cross_singleton() {
3102        let mut deployment = Deployment::new();
3103
3104        let mut flow = FlowBuilder::new();
3105        let node = flow.process::<()>();
3106        let external = flow.external::<()>();
3107
3108        let (input_port, input) =
3109            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3110
3111        let out = input
3112            .cross_singleton(
3113                node.source_iter(q!(vec![1, 2, 3]))
3114                    .fold(q!(|| 0), q!(|acc, v| *acc += v)),
3115            )
3116            .send_bincode_external(&external);
3117
3118        let nodes = flow
3119            .with_process(&node, deployment.Localhost())
3120            .with_external(&external, deployment.Localhost())
3121            .deploy(&mut deployment);
3122
3123        deployment.deploy().await.unwrap();
3124
3125        let mut external_in = nodes.connect(input_port).await;
3126        let mut external_out = nodes.connect(out).await;
3127
3128        deployment.start().await.unwrap();
3129
3130        external_in.send(1).await.unwrap();
3131        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3132
3133        external_in.send(2).await.unwrap();
3134        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3135    }
3136
3137    #[cfg(feature = "deploy")]
3138    #[tokio::test]
3139    async fn top_level_bounded_reduce_cardinality() {
3140        let mut deployment = Deployment::new();
3141
3142        let mut flow = FlowBuilder::new();
3143        let node = flow.process::<()>();
3144        let external = flow.external::<()>();
3145
3146        let (input_port, input) =
3147            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3148
3149        let out = sliced! {
3150            let input = use(input, nondet!(/** test */));
3151            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
3152            input.cross_singleton(v.into_stream().count())
3153        }
3154        .send_bincode_external(&external);
3155
3156        let nodes = flow
3157            .with_process(&node, deployment.Localhost())
3158            .with_external(&external, deployment.Localhost())
3159            .deploy(&mut deployment);
3160
3161        deployment.deploy().await.unwrap();
3162
3163        let mut external_in = nodes.connect(input_port).await;
3164        let mut external_out = nodes.connect(out).await;
3165
3166        deployment.start().await.unwrap();
3167
3168        external_in.send(1).await.unwrap();
3169        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3170
3171        external_in.send(2).await.unwrap();
3172        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3173    }
3174
3175    #[cfg(feature = "deploy")]
3176    #[tokio::test]
3177    async fn top_level_bounded_into_singleton_cardinality() {
3178        let mut deployment = Deployment::new();
3179
3180        let mut flow = FlowBuilder::new();
3181        let node = flow.process::<()>();
3182        let external = flow.external::<()>();
3183
3184        let (input_port, input) =
3185            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3186
3187        let out = sliced! {
3188            let input = use(input, nondet!(/** test */));
3189            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
3190            input.cross_singleton(v.into_stream().count())
3191        }
3192        .send_bincode_external(&external);
3193
3194        let nodes = flow
3195            .with_process(&node, deployment.Localhost())
3196            .with_external(&external, deployment.Localhost())
3197            .deploy(&mut deployment);
3198
3199        deployment.deploy().await.unwrap();
3200
3201        let mut external_in = nodes.connect(input_port).await;
3202        let mut external_out = nodes.connect(out).await;
3203
3204        deployment.start().await.unwrap();
3205
3206        external_in.send(1).await.unwrap();
3207        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3208
3209        external_in.send(2).await.unwrap();
3210        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3211    }
3212
3213    #[cfg(feature = "deploy")]
3214    #[tokio::test]
3215    async fn atomic_fold_replays_each_tick() {
3216        let mut deployment = Deployment::new();
3217
3218        let mut flow = FlowBuilder::new();
3219        let node = flow.process::<()>();
3220        let external = flow.external::<()>();
3221
3222        let (input_port, input) =
3223            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3224        let tick = node.tick();
3225
3226        let out = input
3227            .batch(&tick, nondet!(/** test */))
3228            .cross_singleton(
3229                node.source_iter(q!(vec![1, 2, 3]))
3230                    .atomic()
3231                    .fold(q!(|| 0), q!(|acc, v| *acc += v))
3232                    .snapshot_atomic(&tick, nondet!(/** test */)),
3233            )
3234            .all_ticks()
3235            .send_bincode_external(&external);
3236
3237        let nodes = flow
3238            .with_process(&node, deployment.Localhost())
3239            .with_external(&external, deployment.Localhost())
3240            .deploy(&mut deployment);
3241
3242        deployment.deploy().await.unwrap();
3243
3244        let mut external_in = nodes.connect(input_port).await;
3245        let mut external_out = nodes.connect(out).await;
3246
3247        deployment.start().await.unwrap();
3248
3249        external_in.send(1).await.unwrap();
3250        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3251
3252        external_in.send(2).await.unwrap();
3253        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3254    }
3255
3256    #[cfg(feature = "deploy")]
3257    #[tokio::test]
3258    async fn unbounded_scan_remembers_state() {
3259        let mut deployment = Deployment::new();
3260
3261        let mut flow = FlowBuilder::new();
3262        let node = flow.process::<()>();
3263        let external = flow.external::<()>();
3264
3265        let (input_port, input) = node.source_external_bincode(&external);
3266        let out = input
3267            .scan(
3268                q!(|| 0),
3269                q!(|acc, v| {
3270                    *acc += v;
3271                    Some(*acc)
3272                }),
3273            )
3274            .send_bincode_external(&external);
3275
3276        let nodes = flow
3277            .with_process(&node, deployment.Localhost())
3278            .with_external(&external, deployment.Localhost())
3279            .deploy(&mut deployment);
3280
3281        deployment.deploy().await.unwrap();
3282
3283        let mut external_in = nodes.connect(input_port).await;
3284        let mut external_out = nodes.connect(out).await;
3285
3286        deployment.start().await.unwrap();
3287
3288        external_in.send(1).await.unwrap();
3289        assert_eq!(external_out.next().await.unwrap(), 1);
3290
3291        external_in.send(2).await.unwrap();
3292        assert_eq!(external_out.next().await.unwrap(), 3);
3293    }
3294
3295    #[cfg(feature = "deploy")]
3296    #[tokio::test]
3297    async fn unbounded_enumerate_remembers_state() {
3298        let mut deployment = Deployment::new();
3299
3300        let mut flow = FlowBuilder::new();
3301        let node = flow.process::<()>();
3302        let external = flow.external::<()>();
3303
3304        let (input_port, input) = node.source_external_bincode(&external);
3305        let out = input.enumerate().send_bincode_external(&external);
3306
3307        let nodes = flow
3308            .with_process(&node, deployment.Localhost())
3309            .with_external(&external, deployment.Localhost())
3310            .deploy(&mut deployment);
3311
3312        deployment.deploy().await.unwrap();
3313
3314        let mut external_in = nodes.connect(input_port).await;
3315        let mut external_out = nodes.connect(out).await;
3316
3317        deployment.start().await.unwrap();
3318
3319        external_in.send(1).await.unwrap();
3320        assert_eq!(external_out.next().await.unwrap(), (0, 1));
3321
3322        external_in.send(2).await.unwrap();
3323        assert_eq!(external_out.next().await.unwrap(), (1, 2));
3324    }
3325
3326    #[cfg(feature = "deploy")]
3327    #[tokio::test]
3328    async fn unbounded_unique_remembers_state() {
3329        let mut deployment = Deployment::new();
3330
3331        let mut flow = FlowBuilder::new();
3332        let node = flow.process::<()>();
3333        let external = flow.external::<()>();
3334
3335        let (input_port, input) =
3336            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3337        let out = input.unique().send_bincode_external(&external);
3338
3339        let nodes = flow
3340            .with_process(&node, deployment.Localhost())
3341            .with_external(&external, deployment.Localhost())
3342            .deploy(&mut deployment);
3343
3344        deployment.deploy().await.unwrap();
3345
3346        let mut external_in = nodes.connect(input_port).await;
3347        let mut external_out = nodes.connect(out).await;
3348
3349        deployment.start().await.unwrap();
3350
3351        external_in.send(1).await.unwrap();
3352        assert_eq!(external_out.next().await.unwrap(), 1);
3353
3354        external_in.send(2).await.unwrap();
3355        assert_eq!(external_out.next().await.unwrap(), 2);
3356
3357        external_in.send(1).await.unwrap();
3358        external_in.send(3).await.unwrap();
3359        assert_eq!(external_out.next().await.unwrap(), 3);
3360    }
3361
3362    #[cfg(feature = "sim")]
3363    #[test]
3364    #[should_panic]
3365    fn sim_batch_nondet_size() {
3366        let mut flow = FlowBuilder::new();
3367        let node = flow.process::<()>();
3368
3369        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3370
3371        let tick = node.tick();
3372        let out_recv = input
3373            .batch(&tick, nondet!(/** test */))
3374            .count()
3375            .all_ticks()
3376            .sim_output();
3377
3378        flow.sim().exhaustive(async || {
3379            in_send.send(());
3380            in_send.send(());
3381            in_send.send(());
3382
3383            assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3384        });
3385    }
3386
3387    #[cfg(feature = "sim")]
3388    #[test]
3389    fn sim_batch_preserves_order() {
3390        let mut flow = FlowBuilder::new();
3391        let node = flow.process::<()>();
3392
3393        let (in_send, input) = node.sim_input();
3394
3395        let tick = node.tick();
3396        let out_recv = input
3397            .batch(&tick, nondet!(/** test */))
3398            .all_ticks()
3399            .sim_output();
3400
3401        flow.sim().exhaustive(async || {
3402            in_send.send(1);
3403            in_send.send(2);
3404            in_send.send(3);
3405
3406            out_recv.assert_yields_only([1, 2, 3]).await;
3407        });
3408    }
3409
3410    #[cfg(feature = "sim")]
3411    #[test]
3412    #[should_panic]
3413    fn sim_batch_unordered_shuffles() {
3414        let mut flow = FlowBuilder::new();
3415        let node = flow.process::<()>();
3416
3417        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3418
3419        let tick = node.tick();
3420        let batch = input.batch(&tick, nondet!(/** test */));
3421        let out_recv = batch
3422            .clone()
3423            .min()
3424            .zip(batch.max())
3425            .all_ticks()
3426            .sim_output();
3427
3428        flow.sim().exhaustive(async || {
3429            in_send.send_many_unordered([1, 2, 3]);
3430
3431            if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3432                panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3433            }
3434        });
3435    }
3436
3437    #[cfg(feature = "sim")]
3438    #[test]
3439    fn sim_batch_unordered_shuffles_count() {
3440        let mut flow = FlowBuilder::new();
3441        let node = flow.process::<()>();
3442
3443        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3444
3445        let tick = node.tick();
3446        let batch = input.batch(&tick, nondet!(/** test */));
3447        let out_recv = batch.all_ticks().sim_output();
3448
3449        let instance_count = flow.sim().exhaustive(async || {
3450            in_send.send_many_unordered([1, 2, 3, 4]);
3451            out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3452        });
3453
3454        assert_eq!(
3455            instance_count,
3456            75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3457        )
3458    }
3459
3460    #[cfg(feature = "sim")]
3461    #[test]
3462    #[should_panic]
3463    fn sim_observe_order_batched() {
3464        let mut flow = FlowBuilder::new();
3465        let node = flow.process::<()>();
3466
3467        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3468
3469        let tick = node.tick();
3470        let batch = input.batch(&tick, nondet!(/** test */));
3471        let out_recv = batch
3472            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3473            .all_ticks()
3474            .sim_output();
3475
3476        flow.sim().exhaustive(async || {
3477            in_send.send_many_unordered([1, 2, 3, 4]);
3478            out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3479        });
3480    }
3481
3482    #[cfg(feature = "sim")]
3483    #[test]
3484    fn sim_observe_order_batched_count() {
3485        let mut flow = FlowBuilder::new();
3486        let node = flow.process::<()>();
3487
3488        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3489
3490        let tick = node.tick();
3491        let batch = input.batch(&tick, nondet!(/** test */));
3492        let out_recv = batch
3493            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3494            .all_ticks()
3495            .sim_output();
3496
3497        let instance_count = flow.sim().exhaustive(async || {
3498            in_send.send_many_unordered([1, 2, 3, 4]);
3499            let _ = out_recv.collect::<Vec<_>>().await;
3500        });
3501
3502        assert_eq!(
3503            instance_count,
3504            192 // 4! * 2^{4 - 1}
3505        )
3506    }
3507
3508    #[cfg(feature = "sim")]
3509    #[test]
3510    fn sim_unordered_count_instance_count() {
3511        let mut flow = FlowBuilder::new();
3512        let node = flow.process::<()>();
3513
3514        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3515
3516        let tick = node.tick();
3517        let out_recv = input
3518            .count()
3519            .snapshot(&tick, nondet!(/** test */))
3520            .all_ticks()
3521            .sim_output();
3522
3523        let instance_count = flow.sim().exhaustive(async || {
3524            in_send.send_many_unordered([1, 2, 3, 4]);
3525            assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3526        });
3527
3528        assert_eq!(
3529            instance_count,
3530            16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3531        )
3532    }
3533
3534    #[cfg(feature = "sim")]
3535    #[test]
3536    fn sim_top_level_assume_ordering() {
3537        let mut flow = FlowBuilder::new();
3538        let node = flow.process::<()>();
3539
3540        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3541
3542        let out_recv = input
3543            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3544            .sim_output();
3545
3546        let instance_count = flow.sim().exhaustive(async || {
3547            in_send.send_many_unordered([1, 2, 3]);
3548            let mut out = out_recv.collect::<Vec<_>>().await;
3549            out.sort();
3550            assert_eq!(out, vec![1, 2, 3]);
3551        });
3552
3553        assert_eq!(instance_count, 6)
3554    }
3555
3556    #[cfg(feature = "sim")]
3557    #[test]
3558    fn sim_top_level_assume_ordering_cycle_back() {
3559        let mut flow = FlowBuilder::new();
3560        let node = flow.process::<()>();
3561        let node2 = flow.process::<()>();
3562
3563        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3564
3565        let (complete_cycle_back, cycle_back) =
3566            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3567        let ordered = input
3568            .merge_unordered(cycle_back)
3569            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3570        complete_cycle_back.complete(
3571            ordered
3572                .clone()
3573                .map(q!(|v| v + 1))
3574                .filter(q!(|v| v % 2 == 1))
3575                .send(&node2, TCP.fail_stop().bincode())
3576                .send(&node, TCP.fail_stop().bincode()),
3577        );
3578
3579        let out_recv = ordered.sim_output();
3580
3581        let mut saw = false;
3582        let instance_count = flow.sim().exhaustive(async || {
3583            in_send.send_many_unordered([0, 2]);
3584            let out = out_recv.collect::<Vec<_>>().await;
3585
3586            if out.starts_with(&[0, 1, 2]) {
3587                saw = true;
3588            }
3589        });
3590
3591        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3592        assert_eq!(instance_count, 6);
3593    }
3594
3595    #[cfg(feature = "sim")]
3596    #[test]
3597    fn sim_top_level_assume_ordering_cycle_back_tick() {
3598        let mut flow = FlowBuilder::new();
3599        let node = flow.process::<()>();
3600        let node2 = flow.process::<()>();
3601
3602        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3603
3604        let (complete_cycle_back, cycle_back) =
3605            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3606        let ordered = input
3607            .merge_unordered(cycle_back)
3608            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3609        complete_cycle_back.complete(
3610            ordered
3611                .clone()
3612                .batch(&node.tick(), nondet!(/** test */))
3613                .all_ticks()
3614                .map(q!(|v| v + 1))
3615                .filter(q!(|v| v % 2 == 1))
3616                .send(&node2, TCP.fail_stop().bincode())
3617                .send(&node, TCP.fail_stop().bincode()),
3618        );
3619
3620        let out_recv = ordered.sim_output();
3621
3622        let mut saw = false;
3623        let instance_count = flow.sim().exhaustive(async || {
3624            in_send.send_many_unordered([0, 2]);
3625            let out = out_recv.collect::<Vec<_>>().await;
3626
3627            if out.starts_with(&[0, 1, 2]) {
3628                saw = true;
3629            }
3630        });
3631
3632        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3633        assert_eq!(instance_count, 58);
3634    }
3635
3636    #[cfg(feature = "sim")]
3637    #[test]
3638    fn sim_top_level_assume_ordering_multiple() {
3639        let mut flow = FlowBuilder::new();
3640        let node = flow.process::<()>();
3641        let node2 = flow.process::<()>();
3642
3643        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3644        let (_, input2) = node.sim_input::<_, NoOrder, _>();
3645
3646        let (complete_cycle_back, cycle_back) =
3647            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3648        let input1_ordered = input
3649            .clone()
3650            .merge_unordered(cycle_back)
3651            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3652        let foo = input1_ordered
3653            .clone()
3654            .map(q!(|v| v + 3))
3655            .weaken_ordering::<NoOrder>()
3656            .merge_unordered(input2)
3657            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3658
3659        complete_cycle_back.complete(
3660            foo.filter(q!(|v| *v == 3))
3661                .send(&node2, TCP.fail_stop().bincode())
3662                .send(&node, TCP.fail_stop().bincode()),
3663        );
3664
3665        let out_recv = input1_ordered.sim_output();
3666
3667        let mut saw = false;
3668        let instance_count = flow.sim().exhaustive(async || {
3669            in_send.send_many_unordered([0, 1]);
3670            let out = out_recv.collect::<Vec<_>>().await;
3671
3672            if out.starts_with(&[0, 3, 1]) {
3673                saw = true;
3674            }
3675        });
3676
3677        assert!(saw, "did not see an instance with 0, 3, 1 in order");
3678        assert_eq!(instance_count, 24);
3679    }
3680
3681    #[cfg(feature = "sim")]
3682    #[test]
3683    fn sim_atomic_assume_ordering_cycle_back() {
3684        let mut flow = FlowBuilder::new();
3685        let node = flow.process::<()>();
3686        let node2 = flow.process::<()>();
3687
3688        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3689
3690        let (complete_cycle_back, cycle_back) =
3691            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3692        let ordered = input
3693            .merge_unordered(cycle_back)
3694            .atomic()
3695            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3696            .end_atomic();
3697        complete_cycle_back.complete(
3698            ordered
3699                .clone()
3700                .map(q!(|v| v + 1))
3701                .filter(q!(|v| v % 2 == 1))
3702                .send(&node2, TCP.fail_stop().bincode())
3703                .send(&node, TCP.fail_stop().bincode()),
3704        );
3705
3706        let out_recv = ordered.sim_output();
3707
3708        let instance_count = flow.sim().exhaustive(async || {
3709            in_send.send_many_unordered([0, 2]);
3710            let out = out_recv.collect::<Vec<_>>().await;
3711            assert_eq!(out.len(), 4);
3712        });
3713        assert_eq!(instance_count, 22);
3714    }
3715
3716    #[cfg(feature = "deploy")]
3717    #[tokio::test]
3718    async fn partition_evens_odds() {
3719        let mut deployment = Deployment::new();
3720
3721        let mut flow = FlowBuilder::new();
3722        let node = flow.process::<()>();
3723        let external = flow.external::<()>();
3724
3725        let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
3726        let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
3727        let evens_port = evens.send_bincode_external(&external);
3728        let odds_port = odds.send_bincode_external(&external);
3729
3730        let nodes = flow
3731            .with_process(&node, deployment.Localhost())
3732            .with_external(&external, deployment.Localhost())
3733            .deploy(&mut deployment);
3734
3735        deployment.deploy().await.unwrap();
3736
3737        let mut evens_out = nodes.connect(evens_port).await;
3738        let mut odds_out = nodes.connect(odds_port).await;
3739
3740        deployment.start().await.unwrap();
3741
3742        let mut even_results = Vec::new();
3743        for _ in 0..3 {
3744            even_results.push(evens_out.next().await.unwrap());
3745        }
3746        even_results.sort();
3747        assert_eq!(even_results, vec![2, 4, 6]);
3748
3749        let mut odd_results = Vec::new();
3750        for _ in 0..3 {
3751            odd_results.push(odds_out.next().await.unwrap());
3752        }
3753        odd_results.sort();
3754        assert_eq!(odd_results, vec![1, 3, 5]);
3755    }
3756
3757    #[cfg(feature = "deploy")]
3758    #[tokio::test]
3759    async fn unconsumed_inspect_still_runs() {
3760        use crate::deploy::DeployCrateWrapper;
3761
3762        let mut deployment = Deployment::new();
3763
3764        let mut flow = FlowBuilder::new();
3765        let node = flow.process::<()>();
3766
3767        // The return value of .inspect() is intentionally dropped.
3768        // Before the Null-root fix, this would silently do nothing.
3769        node.source_iter(q!(0..5))
3770            .inspect(q!(|x| println!("inspect: {}", x)));
3771
3772        let nodes = flow
3773            .with_process(&node, deployment.Localhost())
3774            .deploy(&mut deployment);
3775
3776        deployment.deploy().await.unwrap();
3777
3778        let mut stdout = nodes.get_process(&node).stdout();
3779
3780        deployment.start().await.unwrap();
3781
3782        let mut lines = Vec::new();
3783        for _ in 0..5 {
3784            lines.push(stdout.recv().await.unwrap());
3785        }
3786        lines.sort();
3787        assert_eq!(
3788            lines,
3789            vec![
3790                "inspect: 0",
3791                "inspect: 1",
3792                "inspect: 2",
3793                "inspect: 3",
3794                "inspect: 4",
3795            ]
3796        );
3797    }
3798
3799    #[cfg(feature = "sim")]
3800    #[test]
3801    fn sim_limit() {
3802        let mut flow = FlowBuilder::new();
3803        let node = flow.process::<()>();
3804
3805        let (in_send, input) = node.sim_input();
3806
3807        let out_recv = input.limit(q!(3)).sim_output();
3808
3809        flow.sim().exhaustive(async || {
3810            in_send.send(1);
3811            in_send.send(2);
3812            in_send.send(3);
3813            in_send.send(4);
3814            in_send.send(5);
3815
3816            out_recv.assert_yields_only([1, 2, 3]).await;
3817        });
3818    }
3819
3820    #[cfg(feature = "sim")]
3821    #[test]
3822    fn sim_limit_zero() {
3823        let mut flow = FlowBuilder::new();
3824        let node = flow.process::<()>();
3825
3826        let (in_send, input) = node.sim_input();
3827
3828        let out_recv = input.limit(q!(0)).sim_output();
3829
3830        flow.sim().exhaustive(async || {
3831            in_send.send(1);
3832            in_send.send(2);
3833
3834            out_recv.assert_yields_only::<i32, _>([]).await;
3835        });
3836    }
3837
3838    #[cfg(feature = "deploy")]
3839    #[tokio::test]
3840    async fn monotone_fold_threshold() {
3841        use crate::properties::manual_proof;
3842
3843        let mut deployment = Deployment::new();
3844
3845        let mut flow = FlowBuilder::new();
3846        let node = flow.process::<()>();
3847        let external = flow.external::<()>();
3848
3849        let in_unbounded: super::Stream<_, _> =
3850            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
3851        let sum = in_unbounded.fold(
3852            q!(|| 0),
3853            q!(
3854                |sum, v| {
3855                    *sum += v;
3856                },
3857                monotone = manual_proof!(/** test */)
3858            ),
3859        );
3860
3861        let threshold_out = sum
3862            .threshold_greater_or_equal(node.singleton(q!(7)))
3863            .send_bincode_external(&external);
3864
3865        let nodes = flow
3866            .with_process(&node, deployment.Localhost())
3867            .with_external(&external, deployment.Localhost())
3868            .deploy(&mut deployment);
3869
3870        deployment.deploy().await.unwrap();
3871
3872        let mut threshold_out = nodes.connect(threshold_out).await;
3873
3874        deployment.start().await.unwrap();
3875
3876        assert_eq!(threshold_out.next().await.unwrap(), 7);
3877    }
3878
3879    #[cfg(feature = "deploy")]
3880    #[tokio::test]
3881    async fn monotone_count_threshold() {
3882        let mut deployment = Deployment::new();
3883
3884        let mut flow = FlowBuilder::new();
3885        let node = flow.process::<()>();
3886        let external = flow.external::<()>();
3887
3888        let in_unbounded: super::Stream<_, _> =
3889            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
3890        let sum = in_unbounded.count();
3891
3892        let threshold_out = sum
3893            .threshold_greater_or_equal(node.singleton(q!(3)))
3894            .send_bincode_external(&external);
3895
3896        let nodes = flow
3897            .with_process(&node, deployment.Localhost())
3898            .with_external(&external, deployment.Localhost())
3899            .deploy(&mut deployment);
3900
3901        deployment.deploy().await.unwrap();
3902
3903        let mut threshold_out = nodes.connect(threshold_out).await;
3904
3905        deployment.start().await.unwrap();
3906
3907        assert_eq!(threshold_out.next().await.unwrap(), 3);
3908    }
3909}