Skip to main content

hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::{Generate, KeyedStream};
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::singleton::SingletonBound;
27#[cfg(stageleft_runtime)]
28use crate::location::dynamic::{DynLocation, LocationId};
29use crate::location::tick::{Atomic, DeferTick, NoAtomic};
30use crate::location::{Location, NoTick, Tick, check_matching_location};
31use crate::manual_expr::ManualExpr;
32use crate::nondet::{NonDet, nondet};
33use crate::prelude::manual_proof;
34use crate::properties::{
35    AggFuncAlgebra, ApplyMonotoneStream, ValidCommutativityFor, ValidIdempotenceFor,
36};
37
38pub mod networking;
39
40/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
41#[sealed::sealed]
42pub trait Ordering:
43    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
44{
45    /// The [`StreamOrder`] corresponding to this type.
46    const ORDERING_KIND: StreamOrder;
47}
48
49/// Marks the stream as being totally ordered, which means that there are
50/// no sources of non-determinism (other than intentional ones) that will
51/// affect the order of elements.
52pub enum TotalOrder {}
53
54#[sealed::sealed]
55impl Ordering for TotalOrder {
56    const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
57}
58
59/// Marks the stream as having no order, which means that the order of
60/// elements may be affected by non-determinism.
61///
62/// This restricts certain operators, such as `fold` and `reduce`, to only
63/// be used with commutative aggregation functions.
64pub enum NoOrder {}
65
66#[sealed::sealed]
67impl Ordering for NoOrder {
68    const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
69}
70
71/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
72/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
73/// have `Self` guarantees instead.
74#[sealed::sealed]
75pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
76#[sealed::sealed]
77impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
78
79/// Helper trait for determining the weakest of two orderings.
80#[sealed::sealed]
81pub trait MinOrder<Other: ?Sized> {
82    /// The weaker of the two orderings.
83    type Min: Ordering;
84}
85
86#[sealed::sealed]
87impl<O: Ordering> MinOrder<O> for TotalOrder {
88    type Min = O;
89}
90
91#[sealed::sealed]
92impl<O: Ordering> MinOrder<O> for NoOrder {
93    type Min = NoOrder;
94}
95
96/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
97#[sealed::sealed]
98pub trait Retries:
99    MinRetries<Self, Min = Self>
100    + MinRetries<ExactlyOnce, Min = Self>
101    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
102{
103    /// The [`StreamRetry`] corresponding to this type.
104    const RETRIES_KIND: StreamRetry;
105}
106
107/// Marks the stream as having deterministic message cardinality, with no
108/// possibility of duplicates.
109pub enum ExactlyOnce {}
110
111#[sealed::sealed]
112impl Retries for ExactlyOnce {
113    const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
114}
115
116/// Marks the stream as having non-deterministic message cardinality, which
117/// means that duplicates may occur, but messages will not be dropped.
118pub enum AtLeastOnce {}
119
120#[sealed::sealed]
121impl Retries for AtLeastOnce {
122    const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
123}
124
125/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
126/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
127/// have `Self` guarantees instead.
128#[sealed::sealed]
129pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
130#[sealed::sealed]
131impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
132
133/// Helper trait for determining the weakest of two retry guarantees.
134#[sealed::sealed]
135pub trait MinRetries<Other: ?Sized> {
136    /// The weaker of the two retry guarantees.
137    type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
138}
139
140#[sealed::sealed]
141impl<R: Retries> MinRetries<R> for ExactlyOnce {
142    type Min = R;
143}
144
145#[sealed::sealed]
146impl<R: Retries> MinRetries<R> for AtLeastOnce {
147    type Min = AtLeastOnce;
148}
149
150#[sealed::sealed]
151#[diagnostic::on_unimplemented(
152    message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
153    label = "required here",
154    note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
155)]
156/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
157pub trait IsOrdered: Ordering {}
158
159#[sealed::sealed]
160#[diagnostic::do_not_recommend]
161impl IsOrdered for TotalOrder {}
162
163#[sealed::sealed]
164#[diagnostic::on_unimplemented(
165    message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
166    label = "required here",
167    note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
168)]
169/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
170pub trait IsExactlyOnce: Retries {}
171
172#[sealed::sealed]
173#[diagnostic::do_not_recommend]
174impl IsExactlyOnce for ExactlyOnce {}
175
176/// Streaming sequence of elements with type `Type`.
177///
178/// This live collection represents a growing sequence of elements, with new elements being
179/// asynchronously appended to the end of the sequence. This can be used to model the arrival
180/// of network input, such as API requests, or streaming ingestion.
181///
182/// By default, all streams have deterministic ordering and each element is materialized exactly
183/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
184/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
185/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
186///
187/// Type Parameters:
188/// - `Type`: the type of elements in the stream
189/// - `Loc`: the location where the stream is being materialized
190/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
191/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
192///   (default is [`TotalOrder`])
193/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
194///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
195pub struct Stream<
196    Type,
197    Loc,
198    Bound: Boundedness = Unbounded,
199    Order: Ordering = TotalOrder,
200    Retry: Retries = ExactlyOnce,
201> {
202    pub(crate) location: Loc,
203    pub(crate) ir_node: RefCell<HydroNode>,
204    pub(crate) flow_state: FlowState,
205
206    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
207}
208
209impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
210    fn drop(&mut self) {
211        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
212        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
213            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
214                input: Box::new(ir_node),
215                op_metadata: HydroIrOpMetadata::new(),
216            });
217        }
218    }
219}
220
221impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
222    for Stream<T, L, Unbounded, O, R>
223where
224    L: Location<'a>,
225{
226    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
227        let new_meta = stream
228            .location
229            .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
230
231        Stream {
232            location: stream.location.clone(),
233            flow_state: stream.flow_state.clone(),
234            ir_node: RefCell::new(HydroNode::Cast {
235                inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
236                metadata: new_meta,
237            }),
238            _phantom: PhantomData,
239        }
240    }
241}
242
243impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
244    for Stream<T, L, B, NoOrder, R>
245where
246    L: Location<'a>,
247{
248    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
249        stream.weaken_ordering()
250    }
251}
252
253impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
254    for Stream<T, L, B, O, AtLeastOnce>
255where
256    L: Location<'a>,
257{
258    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
259        stream.weaken_retries()
260    }
261}
262
263impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
264where
265    L: Location<'a>,
266{
267    fn defer_tick(self) -> Self {
268        Stream::defer_tick(self)
269    }
270}
271
272impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
273    for Stream<T, Tick<L>, Bounded, O, R>
274where
275    L: Location<'a>,
276{
277    type Location = Tick<L>;
278
279    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
280        Stream::new(
281            location.clone(),
282            HydroNode::CycleSource {
283                cycle_id,
284                metadata: location.new_node_metadata(Self::collection_kind()),
285            },
286        )
287    }
288}
289
290impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
291    for Stream<T, Tick<L>, Bounded, O, R>
292where
293    L: Location<'a>,
294{
295    type Location = Tick<L>;
296
297    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
298        let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
299            location.clone(),
300            HydroNode::DeferTick {
301                input: Box::new(HydroNode::CycleSource {
302                    cycle_id,
303                    metadata: location.new_node_metadata(Self::collection_kind()),
304                }),
305                metadata: location.new_node_metadata(Self::collection_kind()),
306            },
307        );
308
309        from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
310    }
311}
312
313impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
314    for Stream<T, Tick<L>, Bounded, O, R>
315where
316    L: Location<'a>,
317{
318    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
319        assert_eq!(
320            Location::id(&self.location),
321            expected_location,
322            "locations do not match"
323        );
324        self.location
325            .flow_state()
326            .borrow_mut()
327            .push_root(HydroRoot::CycleSink {
328                cycle_id,
329                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
330                op_metadata: HydroIrOpMetadata::new(),
331            });
332    }
333}
334
335impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
336    for Stream<T, L, B, O, R>
337where
338    L: Location<'a> + NoTick,
339{
340    type Location = L;
341
342    fn create_source(cycle_id: CycleId, location: L) -> Self {
343        Stream::new(
344            location.clone(),
345            HydroNode::CycleSource {
346                cycle_id,
347                metadata: location.new_node_metadata(Self::collection_kind()),
348            },
349        )
350    }
351}
352
353impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
354    for Stream<T, L, B, O, R>
355where
356    L: Location<'a> + NoTick,
357{
358    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
359        assert_eq!(
360            Location::id(&self.location),
361            expected_location,
362            "locations do not match"
363        );
364        self.location
365            .flow_state()
366            .borrow_mut()
367            .push_root(HydroRoot::CycleSink {
368                cycle_id,
369                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
370                op_metadata: HydroIrOpMetadata::new(),
371            });
372    }
373}
374
375impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
376where
377    T: Clone,
378    L: Location<'a>,
379{
380    fn clone(&self) -> Self {
381        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
382            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
383            *self.ir_node.borrow_mut() = HydroNode::Tee {
384                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
385                metadata: self.location.new_node_metadata(Self::collection_kind()),
386            };
387        }
388
389        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
390            Stream {
391                location: self.location.clone(),
392                flow_state: self.flow_state.clone(),
393                ir_node: HydroNode::Tee {
394                    inner: SharedNode(inner.0.clone()),
395                    metadata: metadata.clone(),
396                }
397                .into(),
398                _phantom: PhantomData,
399            }
400        } else {
401            unreachable!()
402        }
403    }
404}
405
406impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
407where
408    L: Location<'a>,
409{
410    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
411        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
412        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
413
414        let flow_state = location.flow_state().clone();
415        Stream {
416            location,
417            flow_state,
418            ir_node: RefCell::new(ir_node),
419            _phantom: PhantomData,
420        }
421    }
422
423    /// Returns the [`Location`] where this stream is being materialized.
424    pub fn location(&self) -> &L {
425        &self.location
426    }
427
428    pub(crate) fn collection_kind() -> CollectionKind {
429        CollectionKind::Stream {
430            bound: B::BOUND_KIND,
431            order: O::ORDERING_KIND,
432            retry: R::RETRIES_KIND,
433            element_type: quote_type::<T>().into(),
434        }
435    }
436
437    /// Produces a stream based on invoking `f` on each element.
438    /// If you do not want to modify the stream and instead only want to view
439    /// each item use [`Stream::inspect`] instead.
440    ///
441    /// # Example
442    /// ```rust
443    /// # #[cfg(feature = "deploy")] {
444    /// # use hydro_lang::prelude::*;
445    /// # use futures::StreamExt;
446    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
447    /// let words = process.source_iter(q!(vec!["hello", "world"]));
448    /// words.map(q!(|x| x.to_uppercase()))
449    /// # }, |mut stream| async move {
450    /// # for w in vec!["HELLO", "WORLD"] {
451    /// #     assert_eq!(stream.next().await.unwrap(), w);
452    /// # }
453    /// # }));
454    /// # }
455    /// ```
456    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
457    where
458        F: Fn(T) -> U + 'a,
459    {
460        let f = f.splice_fn1_ctx(&self.location).into();
461        Stream::new(
462            self.location.clone(),
463            HydroNode::Map {
464                f,
465                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
466                metadata: self
467                    .location
468                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
469            },
470        )
471    }
472
473    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
474    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
475    /// for the output type `U` must produce items in a **deterministic** order.
476    ///
477    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
478    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
479    ///
480    /// # Example
481    /// ```rust
482    /// # #[cfg(feature = "deploy")] {
483    /// # use hydro_lang::prelude::*;
484    /// # use futures::StreamExt;
485    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
486    /// process
487    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
488    ///     .flat_map_ordered(q!(|x| x))
489    /// # }, |mut stream| async move {
490    /// // 1, 2, 3, 4
491    /// # for w in (1..5) {
492    /// #     assert_eq!(stream.next().await.unwrap(), w);
493    /// # }
494    /// # }));
495    /// # }
496    /// ```
497    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
498    where
499        I: IntoIterator<Item = U>,
500        F: Fn(T) -> I + 'a,
501    {
502        let f = f.splice_fn1_ctx(&self.location).into();
503        Stream::new(
504            self.location.clone(),
505            HydroNode::FlatMap {
506                f,
507                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
508                metadata: self
509                    .location
510                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
511            },
512        )
513    }
514
515    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
516    /// for the output type `U` to produce items in any order.
517    ///
518    /// # Example
519    /// ```rust
520    /// # #[cfg(feature = "deploy")] {
521    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
522    /// # use futures::StreamExt;
523    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
524    /// process
525    ///     .source_iter(q!(vec![
526    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
527    ///         std::collections::HashSet::from_iter(vec![3, 4]),
528    ///     ]))
529    ///     .flat_map_unordered(q!(|x| x))
530    /// # }, |mut stream| async move {
531    /// // 1, 2, 3, 4, but in no particular order
532    /// # let mut results = Vec::new();
533    /// # for w in (1..5) {
534    /// #     results.push(stream.next().await.unwrap());
535    /// # }
536    /// # results.sort();
537    /// # assert_eq!(results, vec![1, 2, 3, 4]);
538    /// # }));
539    /// # }
540    /// ```
541    pub fn flat_map_unordered<U, I, F>(
542        self,
543        f: impl IntoQuotedMut<'a, F, L>,
544    ) -> Stream<U, L, B, NoOrder, R>
545    where
546        I: IntoIterator<Item = U>,
547        F: Fn(T) -> I + 'a,
548    {
549        let f = f.splice_fn1_ctx(&self.location).into();
550        Stream::new(
551            self.location.clone(),
552            HydroNode::FlatMap {
553                f,
554                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
555                metadata: self
556                    .location
557                    .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
558            },
559        )
560    }
561
562    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
563    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
564    ///
565    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
566    /// not deterministic, use [`Stream::flatten_unordered`] instead.
567    ///
568    /// ```rust
569    /// # #[cfg(feature = "deploy")] {
570    /// # use hydro_lang::prelude::*;
571    /// # use futures::StreamExt;
572    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
573    /// process
574    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
575    ///     .flatten_ordered()
576    /// # }, |mut stream| async move {
577    /// // 1, 2, 3, 4
578    /// # for w in (1..5) {
579    /// #     assert_eq!(stream.next().await.unwrap(), w);
580    /// # }
581    /// # }));
582    /// # }
583    /// ```
584    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
585    where
586        T: IntoIterator<Item = U>,
587    {
588        self.flat_map_ordered(q!(|d| d))
589    }
590
591    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
592    /// for the element type `T` to produce items in any order.
593    ///
594    /// # Example
595    /// ```rust
596    /// # #[cfg(feature = "deploy")] {
597    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
598    /// # use futures::StreamExt;
599    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
600    /// process
601    ///     .source_iter(q!(vec![
602    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
603    ///         std::collections::HashSet::from_iter(vec![3, 4]),
604    ///     ]))
605    ///     .flatten_unordered()
606    /// # }, |mut stream| async move {
607    /// // 1, 2, 3, 4, but in no particular order
608    /// # let mut results = Vec::new();
609    /// # for w in (1..5) {
610    /// #     results.push(stream.next().await.unwrap());
611    /// # }
612    /// # results.sort();
613    /// # assert_eq!(results, vec![1, 2, 3, 4]);
614    /// # }));
615    /// # }
616    /// ```
617    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
618    where
619        T: IntoIterator<Item = U>,
620    {
621        self.flat_map_unordered(q!(|d| d))
622    }
623
624    /// For each item in the input stream, apply `f` to produce a [`futures::stream::Stream`],
625    /// then emit the elements of that stream one by one. When the inner stream yields
626    /// `Pending`, this operator yields as well.
627    pub fn flat_map_stream_blocking<U, S, F>(
628        self,
629        f: impl IntoQuotedMut<'a, F, L>,
630    ) -> Stream<U, L, B, O, R>
631    where
632        S: futures::Stream<Item = U>,
633        F: Fn(T) -> S + 'a,
634    {
635        let f = f.splice_fn1_ctx(&self.location).into();
636        Stream::new(
637            self.location.clone(),
638            HydroNode::FlatMapStreamBlocking {
639                f,
640                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
641                metadata: self
642                    .location
643                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
644            },
645        )
646    }
647
648    /// For each item in the input stream, treat it as a [`futures::stream::Stream`] and
649    /// emit its elements one by one. When the inner stream yields `Pending`, this operator
650    /// yields as well.
651    pub fn flatten_stream_blocking<U>(self) -> Stream<U, L, B, O, R>
652    where
653        T: futures::Stream<Item = U>,
654    {
655        self.flat_map_stream_blocking(q!(|d| d))
656    }
657
658    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
659    /// `f`, preserving the order of the elements.
660    ///
661    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
662    /// not modify or take ownership of the values. If you need to modify the values while filtering
663    /// use [`Stream::filter_map`] instead.
664    ///
665    /// # Example
666    /// ```rust
667    /// # #[cfg(feature = "deploy")] {
668    /// # use hydro_lang::prelude::*;
669    /// # use futures::StreamExt;
670    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
671    /// process
672    ///     .source_iter(q!(vec![1, 2, 3, 4]))
673    ///     .filter(q!(|&x| x > 2))
674    /// # }, |mut stream| async move {
675    /// // 3, 4
676    /// # for w in (3..5) {
677    /// #     assert_eq!(stream.next().await.unwrap(), w);
678    /// # }
679    /// # }));
680    /// # }
681    /// ```
682    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
683    where
684        F: Fn(&T) -> bool + 'a,
685    {
686        let f = f.splice_fn1_borrow_ctx(&self.location).into();
687        Stream::new(
688            self.location.clone(),
689            HydroNode::Filter {
690                f,
691                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
692                metadata: self.location.new_node_metadata(Self::collection_kind()),
693            },
694        )
695    }
696
697    /// Splits the stream into two streams based on a predicate, without cloning elements.
698    ///
699    /// Elements for which `f` returns `true` are sent to the first output stream,
700    /// and elements for which `f` returns `false` are sent to the second output stream.
701    ///
702    /// Unlike using `filter` twice, this only evaluates the predicate once per element
703    /// and does not require `T: Clone`.
704    ///
705    /// The closure `f` receives a reference `&T` rather than an owned value `T` because
706    /// the predicate is only used for routing; the element itself is moved to the
707    /// appropriate output stream.
708    ///
709    /// # Example
710    /// ```rust
711    /// # #[cfg(feature = "deploy")] {
712    /// # use hydro_lang::prelude::*;
713    /// # use hydro_lang::live_collections::stream::{NoOrder, ExactlyOnce};
714    /// # use futures::StreamExt;
715    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
716    /// let numbers: Stream<_, _, Unbounded> = process.source_iter(q!(vec![1, 2, 3, 4, 5, 6])).into();
717    /// let (evens, odds) = numbers.partition(q!(|&x| x % 2 == 0));
718    /// // evens: 2, 4, 6 tagged with true; odds: 1, 3, 5 tagged with false
719    /// evens.map(q!(|x| (x, true)))
720    ///     .merge_unordered(odds.map(q!(|x| (x, false))))
721    /// # }, |mut stream| async move {
722    /// # let mut results = Vec::new();
723    /// # for _ in 0..6 {
724    /// #     results.push(stream.next().await.unwrap());
725    /// # }
726    /// # results.sort();
727    /// # assert_eq!(results, vec![(1, false), (2, true), (3, false), (4, true), (5, false), (6, true)]);
728    /// # }));
729    /// # }
730    /// ```
731    #[expect(
732        clippy::type_complexity,
733        reason = "return type mirrors the input stream type"
734    )]
735    pub fn partition<F>(
736        self,
737        f: impl IntoQuotedMut<'a, F, L>,
738    ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
739    where
740        F: Fn(&T) -> bool + 'a,
741    {
742        let f: crate::compile::ir::DebugExpr = f.splice_fn1_borrow_ctx(&self.location).into();
743        let shared = SharedNode(Rc::new(RefCell::new(
744            self.ir_node.replace(HydroNode::Placeholder),
745        )));
746
747        let true_stream = Stream::new(
748            self.location.clone(),
749            HydroNode::Partition {
750                inner: SharedNode(shared.0.clone()),
751                f: f.clone(),
752                is_true: true,
753                metadata: self.location.new_node_metadata(Self::collection_kind()),
754            },
755        );
756
757        let false_stream = Stream::new(
758            self.location.clone(),
759            HydroNode::Partition {
760                inner: SharedNode(shared.0),
761                f,
762                is_true: false,
763                metadata: self.location.new_node_metadata(Self::collection_kind()),
764            },
765        );
766
767        (true_stream, false_stream)
768    }
769
770    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
771    ///
772    /// # Example
773    /// ```rust
774    /// # #[cfg(feature = "deploy")] {
775    /// # use hydro_lang::prelude::*;
776    /// # use futures::StreamExt;
777    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
778    /// process
779    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
780    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
781    /// # }, |mut stream| async move {
782    /// // 1, 2
783    /// # for w in (1..3) {
784    /// #     assert_eq!(stream.next().await.unwrap(), w);
785    /// # }
786    /// # }));
787    /// # }
788    /// ```
789    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
790    where
791        F: Fn(T) -> Option<U> + 'a,
792    {
793        let f = f.splice_fn1_ctx(&self.location).into();
794        Stream::new(
795            self.location.clone(),
796            HydroNode::FilterMap {
797                f,
798                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
799                metadata: self
800                    .location
801                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
802            },
803        )
804    }
805
806    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
807    /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
808    /// If `other` is an empty [`Optional`], no values will be produced.
809    ///
810    /// # Example
811    /// ```rust
812    /// # #[cfg(feature = "deploy")] {
813    /// # use hydro_lang::prelude::*;
814    /// # use futures::StreamExt;
815    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
816    /// let tick = process.tick();
817    /// let batch = process
818    ///   .source_iter(q!(vec![1, 2, 3, 4]))
819    ///   .batch(&tick, nondet!(/** test */));
820    /// let count = batch.clone().count(); // `count()` returns a singleton
821    /// batch.cross_singleton(count).all_ticks()
822    /// # }, |mut stream| async move {
823    /// // (1, 4), (2, 4), (3, 4), (4, 4)
824    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
825    /// #     assert_eq!(stream.next().await.unwrap(), w);
826    /// # }
827    /// # }));
828    /// # }
829    /// ```
830    pub fn cross_singleton<O2>(
831        self,
832        other: impl Into<Optional<O2, L, Bounded>>,
833    ) -> Stream<(T, O2), L, B, O, R>
834    where
835        O2: Clone,
836    {
837        let other: Optional<O2, L, Bounded> = other.into();
838        check_matching_location(&self.location, &other.location);
839
840        Stream::new(
841            self.location.clone(),
842            HydroNode::CrossSingleton {
843                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
844                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
845                metadata: self
846                    .location
847                    .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
848            },
849        )
850    }
851
852    /// Passes this stream through if the boolean signal is `true`, otherwise the output is empty.
853    ///
854    /// # Example
855    /// ```rust
856    /// # #[cfg(feature = "deploy")] {
857    /// # use hydro_lang::prelude::*;
858    /// # use futures::StreamExt;
859    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
860    /// let tick = process.tick();
861    /// // ticks are lazy by default, forces the second tick to run
862    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
863    ///
864    /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
865    /// let batch_first_tick = process
866    ///   .source_iter(q!(vec![1, 2, 3, 4]))
867    ///   .batch(&tick, nondet!(/** test */));
868    /// let batch_second_tick = process
869    ///   .source_iter(q!(vec![5, 6, 7, 8]))
870    ///   .batch(&tick, nondet!(/** test */))
871    ///   .defer_tick();
872    /// batch_first_tick.chain(batch_second_tick)
873    ///   .filter_if(signal)
874    ///   .all_ticks()
875    /// # }, |mut stream| async move {
876    /// // [1, 2, 3, 4]
877    /// # for w in vec![1, 2, 3, 4] {
878    /// #     assert_eq!(stream.next().await.unwrap(), w);
879    /// # }
880    /// # }));
881    /// # }
882    /// ```
883    pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
884        self.cross_singleton(signal.filter(q!(|b| *b)))
885            .map(q!(|(d, _)| d))
886    }
887
888    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
889    ///
890    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
891    /// leader of a cluster.
892    ///
893    /// # Example
894    /// ```rust
895    /// # #[cfg(feature = "deploy")] {
896    /// # use hydro_lang::prelude::*;
897    /// # use futures::StreamExt;
898    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
899    /// let tick = process.tick();
900    /// // ticks are lazy by default, forces the second tick to run
901    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
902    ///
903    /// let batch_first_tick = process
904    ///   .source_iter(q!(vec![1, 2, 3, 4]))
905    ///   .batch(&tick, nondet!(/** test */));
906    /// let batch_second_tick = process
907    ///   .source_iter(q!(vec![5, 6, 7, 8]))
908    ///   .batch(&tick, nondet!(/** test */))
909    ///   .defer_tick(); // appears on the second tick
910    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
911    /// batch_first_tick.chain(batch_second_tick)
912    ///   .filter_if_some(some_on_first_tick)
913    ///   .all_ticks()
914    /// # }, |mut stream| async move {
915    /// // [1, 2, 3, 4]
916    /// # for w in vec![1, 2, 3, 4] {
917    /// #     assert_eq!(stream.next().await.unwrap(), w);
918    /// # }
919    /// # }));
920    /// # }
921    /// ```
922    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
923    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
924        self.filter_if(signal.is_some())
925    }
926
927    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
928    ///
929    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
930    /// some local state.
931    ///
932    /// # Example
933    /// ```rust
934    /// # #[cfg(feature = "deploy")] {
935    /// # use hydro_lang::prelude::*;
936    /// # use futures::StreamExt;
937    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
938    /// let tick = process.tick();
939    /// // ticks are lazy by default, forces the second tick to run
940    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
941    ///
942    /// let batch_first_tick = process
943    ///   .source_iter(q!(vec![1, 2, 3, 4]))
944    ///   .batch(&tick, nondet!(/** test */));
945    /// let batch_second_tick = process
946    ///   .source_iter(q!(vec![5, 6, 7, 8]))
947    ///   .batch(&tick, nondet!(/** test */))
948    ///   .defer_tick(); // appears on the second tick
949    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
950    /// batch_first_tick.chain(batch_second_tick)
951    ///   .filter_if_none(some_on_first_tick)
952    ///   .all_ticks()
953    /// # }, |mut stream| async move {
954    /// // [5, 6, 7, 8]
955    /// # for w in vec![5, 6, 7, 8] {
956    /// #     assert_eq!(stream.next().await.unwrap(), w);
957    /// # }
958    /// # }));
959    /// # }
960    /// ```
961    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
962    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
963        self.filter_if(other.is_none())
964    }
965
966    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams,
967    /// returning all tupled pairs.
968    ///
969    /// When the right side is [`Bounded`], it is accumulated first and the left side streams
970    /// through, preserving the left side's ordering. When both sides are [`Unbounded`], a
971    /// symmetric hash join is used and ordering is [`NoOrder`].
972    ///
973    /// # Example
974    /// ```rust
975    /// # #[cfg(feature = "deploy")] {
976    /// # use hydro_lang::prelude::*;
977    /// # use std::collections::HashSet;
978    /// # use futures::StreamExt;
979    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
980    /// let tick = process.tick();
981    /// let stream1 = process.source_iter(q!(vec![1, 2]));
982    /// let stream2 = process.source_iter(q!(vec!['a', 'b']));
983    /// stream1.cross_product(stream2)
984    /// # }, |mut stream| async move {
985    /// // (1, 'a'), (1, 'b'), (2, 'a'), (2, 'b') in any order
986    /// # let expected = HashSet::from([(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]);
987    /// # stream.map(|i| assert!(expected.contains(&i)));
988    /// # }));
989    /// # }
990    pub fn cross_product<T2, B2: Boundedness, O2: Ordering>(
991        self,
992        other: Stream<T2, L, B2, O2, R>,
993    ) -> Stream<(T, T2), L, B, B2::PreserveOrderIfBounded<O>, R>
994    where
995        T: Clone,
996        T2: Clone,
997    {
998        self.map(q!(|v| ((), v)))
999            .join(other.map(q!(|v| ((), v))))
1000            .map(q!(|((), (v1, v2))| (v1, v2)))
1001    }
1002
1003    /// Takes one stream as input and filters out any duplicate occurrences. The output
1004    /// contains all unique values from the input.
1005    ///
1006    /// # Example
1007    /// ```rust
1008    /// # #[cfg(feature = "deploy")] {
1009    /// # use hydro_lang::prelude::*;
1010    /// # use futures::StreamExt;
1011    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1012    /// let tick = process.tick();
1013    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
1014    /// # }, |mut stream| async move {
1015    /// # for w in vec![1, 2, 3, 4] {
1016    /// #     assert_eq!(stream.next().await.unwrap(), w);
1017    /// # }
1018    /// # }));
1019    /// # }
1020    /// ```
1021    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
1022    where
1023        T: Eq + Hash,
1024    {
1025        Stream::new(
1026            self.location.clone(),
1027            HydroNode::Unique {
1028                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1029                metadata: self
1030                    .location
1031                    .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1032            },
1033        )
1034    }
1035
1036    /// Outputs everything in this stream that is *not* contained in the `other` stream.
1037    ///
1038    /// The `other` stream must be [`Bounded`], since this function will wait until
1039    /// all its elements are available before producing any output.
1040    /// # Example
1041    /// ```rust
1042    /// # #[cfg(feature = "deploy")] {
1043    /// # use hydro_lang::prelude::*;
1044    /// # use futures::StreamExt;
1045    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1046    /// let tick = process.tick();
1047    /// let stream = process
1048    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
1049    ///   .batch(&tick, nondet!(/** test */));
1050    /// let batch = process
1051    ///   .source_iter(q!(vec![1, 2]))
1052    ///   .batch(&tick, nondet!(/** test */));
1053    /// stream.filter_not_in(batch).all_ticks()
1054    /// # }, |mut stream| async move {
1055    /// # for w in vec![3, 4] {
1056    /// #     assert_eq!(stream.next().await.unwrap(), w);
1057    /// # }
1058    /// # }));
1059    /// # }
1060    /// ```
1061    pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1062    where
1063        T: Eq + Hash,
1064        B2: IsBounded,
1065    {
1066        check_matching_location(&self.location, &other.location);
1067
1068        Stream::new(
1069            self.location.clone(),
1070            HydroNode::Difference {
1071                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1072                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1073                metadata: self
1074                    .location
1075                    .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1076            },
1077        )
1078    }
1079
1080    /// An operator which allows you to "inspect" each element of a stream without
1081    /// modifying it. The closure `f` is called on a reference to each item. This is
1082    /// mainly useful for debugging, and should not be used to generate side-effects.
1083    ///
1084    /// # Example
1085    /// ```rust
1086    /// # #[cfg(feature = "deploy")] {
1087    /// # use hydro_lang::prelude::*;
1088    /// # use futures::StreamExt;
1089    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1090    /// let nums = process.source_iter(q!(vec![1, 2]));
1091    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
1092    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
1093    /// # }, |mut stream| async move {
1094    /// # for w in vec![1, 2] {
1095    /// #     assert_eq!(stream.next().await.unwrap(), w);
1096    /// # }
1097    /// # }));
1098    /// # }
1099    /// ```
1100    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1101    where
1102        F: Fn(&T) + 'a,
1103    {
1104        let f = f.splice_fn1_borrow_ctx(&self.location).into();
1105
1106        Stream::new(
1107            self.location.clone(),
1108            HydroNode::Inspect {
1109                f,
1110                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1111                metadata: self.location.new_node_metadata(Self::collection_kind()),
1112            },
1113        )
1114    }
1115
1116    /// Executes the provided closure for every element in this stream.
1117    ///
1118    /// Because the closure may have side effects, the stream must have deterministic order
1119    /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
1120    /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
1121    /// [`Stream::assume_retries`] with an explanation for why this is the case.
1122    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
1123    where
1124        O: IsOrdered,
1125        R: IsExactlyOnce,
1126    {
1127        let f = f.splice_fn1_ctx(&self.location).into();
1128        self.location
1129            .flow_state()
1130            .borrow_mut()
1131            .push_root(HydroRoot::ForEach {
1132                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1133                f,
1134                op_metadata: HydroIrOpMetadata::new(),
1135            });
1136    }
1137
1138    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
1139    /// TCP socket to some other server. You should _not_ use this API for interacting with
1140    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
1141    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
1142    /// interaction with asynchronous sinks.
1143    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1144    where
1145        O: IsOrdered,
1146        R: IsExactlyOnce,
1147        S: 'a + futures::Sink<T> + Unpin,
1148    {
1149        self.location
1150            .flow_state()
1151            .borrow_mut()
1152            .push_root(HydroRoot::DestSink {
1153                sink: sink.splice_typed_ctx(&self.location).into(),
1154                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1155                op_metadata: HydroIrOpMetadata::new(),
1156            });
1157    }
1158
1159    /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1160    ///
1161    /// # Example
1162    /// ```rust
1163    /// # #[cfg(feature = "deploy")] {
1164    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1165    /// # use futures::StreamExt;
1166    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1167    /// let tick = process.tick();
1168    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1169    /// numbers.enumerate()
1170    /// # }, |mut stream| async move {
1171    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1172    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1173    /// #     assert_eq!(stream.next().await.unwrap(), w);
1174    /// # }
1175    /// # }));
1176    /// # }
1177    /// ```
1178    pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1179    where
1180        O: IsOrdered,
1181        R: IsExactlyOnce,
1182    {
1183        Stream::new(
1184            self.location.clone(),
1185            HydroNode::Enumerate {
1186                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1187                metadata: self.location.new_node_metadata(Stream::<
1188                    (usize, T),
1189                    L,
1190                    B,
1191                    TotalOrder,
1192                    ExactlyOnce,
1193                >::collection_kind()),
1194            },
1195        )
1196    }
1197
1198    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1199    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1200    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1201    ///
1202    /// Depending on the input stream guarantees, the closure may need to be commutative
1203    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1204    ///
1205    /// # Example
1206    /// ```rust
1207    /// # #[cfg(feature = "deploy")] {
1208    /// # use hydro_lang::prelude::*;
1209    /// # use futures::StreamExt;
1210    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1211    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1212    /// words
1213    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1214    ///     .into_stream()
1215    /// # }, |mut stream| async move {
1216    /// // "HELLOWORLD"
1217    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1218    /// # }));
1219    /// # }
1220    /// ```
1221    pub fn fold<A, I, F, C, Idemp, M, B2: SingletonBound>(
1222        self,
1223        init: impl IntoQuotedMut<'a, I, L>,
1224        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1225    ) -> Singleton<A, L, B2>
1226    where
1227        I: Fn() -> A + 'a,
1228        F: Fn(&mut A, T),
1229        C: ValidCommutativityFor<O>,
1230        Idemp: ValidIdempotenceFor<R>,
1231        B: ApplyMonotoneStream<M, B2>,
1232    {
1233        let init = init.splice_fn0_ctx(&self.location).into();
1234        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1235        proof.register_proof(&comb);
1236
1237        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1238        let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1239
1240        let core = HydroNode::Fold {
1241            init,
1242            acc: comb.into(),
1243            input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1244            metadata: ordered_etc
1245                .location
1246                .new_node_metadata(Singleton::<A, L, B2>::collection_kind()),
1247        };
1248
1249        Singleton::new(ordered_etc.location.clone(), core)
1250    }
1251
1252    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1253    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1254    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1255    /// reference, so that it can be modified in place.
1256    ///
1257    /// Depending on the input stream guarantees, the closure may need to be commutative
1258    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1259    ///
1260    /// # Example
1261    /// ```rust
1262    /// # #[cfg(feature = "deploy")] {
1263    /// # use hydro_lang::prelude::*;
1264    /// # use futures::StreamExt;
1265    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1266    /// let bools = process.source_iter(q!(vec![false, true, false]));
1267    /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1268    /// # }, |mut stream| async move {
1269    /// // true
1270    /// # assert_eq!(stream.next().await.unwrap(), true);
1271    /// # }));
1272    /// # }
1273    /// ```
1274    pub fn reduce<F, C, Idemp>(
1275        self,
1276        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1277    ) -> Optional<T, L, B>
1278    where
1279        F: Fn(&mut T, T) + 'a,
1280        C: ValidCommutativityFor<O>,
1281        Idemp: ValidIdempotenceFor<R>,
1282    {
1283        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1284        proof.register_proof(&f);
1285
1286        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1287        let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1288
1289        let core = HydroNode::Reduce {
1290            f: f.into(),
1291            input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1292            metadata: ordered_etc
1293                .location
1294                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1295        };
1296
1297        Optional::new(ordered_etc.location.clone(), core)
1298    }
1299
1300    /// Computes the maximum element in the stream as an [`Optional`], which
1301    /// will be empty until the first element in the input arrives.
1302    ///
1303    /// # Example
1304    /// ```rust
1305    /// # #[cfg(feature = "deploy")] {
1306    /// # use hydro_lang::prelude::*;
1307    /// # use futures::StreamExt;
1308    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1309    /// let tick = process.tick();
1310    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1311    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1312    /// batch.max().all_ticks()
1313    /// # }, |mut stream| async move {
1314    /// // 4
1315    /// # assert_eq!(stream.next().await.unwrap(), 4);
1316    /// # }));
1317    /// # }
1318    /// ```
1319    pub fn max(self) -> Optional<T, L, B>
1320    where
1321        T: Ord,
1322    {
1323        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1324            .assume_ordering_trusted_bounded::<TotalOrder>(
1325                nondet!(/** max is commutative, but order affects intermediates */),
1326            )
1327            .reduce(q!(|curr, new| {
1328                if new > *curr {
1329                    *curr = new;
1330                }
1331            }))
1332    }
1333
1334    /// Computes the minimum element in the stream as an [`Optional`], which
1335    /// will be empty until the first element in the input arrives.
1336    ///
1337    /// # Example
1338    /// ```rust
1339    /// # #[cfg(feature = "deploy")] {
1340    /// # use hydro_lang::prelude::*;
1341    /// # use futures::StreamExt;
1342    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1343    /// let tick = process.tick();
1344    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1345    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1346    /// batch.min().all_ticks()
1347    /// # }, |mut stream| async move {
1348    /// // 1
1349    /// # assert_eq!(stream.next().await.unwrap(), 1);
1350    /// # }));
1351    /// # }
1352    /// ```
1353    pub fn min(self) -> Optional<T, L, B>
1354    where
1355        T: Ord,
1356    {
1357        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1358            .assume_ordering_trusted_bounded::<TotalOrder>(
1359                nondet!(/** max is commutative, but order affects intermediates */),
1360            )
1361            .reduce(q!(|curr, new| {
1362                if new < *curr {
1363                    *curr = new;
1364                }
1365            }))
1366    }
1367
1368    /// Computes the first element in the stream as an [`Optional`], which
1369    /// will be empty until the first element in the input arrives.
1370    ///
1371    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1372    /// re-ordering of elements may cause the first element to change.
1373    ///
1374    /// # Example
1375    /// ```rust
1376    /// # #[cfg(feature = "deploy")] {
1377    /// # use hydro_lang::prelude::*;
1378    /// # use futures::StreamExt;
1379    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1380    /// let tick = process.tick();
1381    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1382    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1383    /// batch.first().all_ticks()
1384    /// # }, |mut stream| async move {
1385    /// // 1
1386    /// # assert_eq!(stream.next().await.unwrap(), 1);
1387    /// # }));
1388    /// # }
1389    /// ```
1390    pub fn first(self) -> Optional<T, L, B>
1391    where
1392        O: IsOrdered,
1393    {
1394        self.make_totally_ordered()
1395            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1396            .generator(q!(|| ()), q!(|_, item| Generate::Return(item)))
1397            .reduce(q!(|_, _| {}))
1398    }
1399
1400    /// Computes the last element in the stream as an [`Optional`], which
1401    /// will be empty until an element in the input arrives.
1402    ///
1403    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1404    /// re-ordering of elements may cause the last element to change.
1405    ///
1406    /// # Example
1407    /// ```rust
1408    /// # #[cfg(feature = "deploy")] {
1409    /// # use hydro_lang::prelude::*;
1410    /// # use futures::StreamExt;
1411    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1412    /// let tick = process.tick();
1413    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1414    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1415    /// batch.last().all_ticks()
1416    /// # }, |mut stream| async move {
1417    /// // 4
1418    /// # assert_eq!(stream.next().await.unwrap(), 4);
1419    /// # }));
1420    /// # }
1421    /// ```
1422    pub fn last(self) -> Optional<T, L, B>
1423    where
1424        O: IsOrdered,
1425    {
1426        self.make_totally_ordered()
1427            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1428            .reduce(q!(|curr, new| *curr = new))
1429    }
1430
1431    /// Returns a stream containing at most the first `n` elements of the input stream,
1432    /// preserving the original order. Similar to `LIMIT` in SQL.
1433    ///
1434    /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1435    /// retries, since the result depends on the order and cardinality of elements.
1436    ///
1437    /// # Example
1438    /// ```rust
1439    /// # #[cfg(feature = "deploy")] {
1440    /// # use hydro_lang::prelude::*;
1441    /// # use futures::StreamExt;
1442    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1443    /// let numbers = process.source_iter(q!(vec![10, 20, 30, 40, 50]));
1444    /// numbers.limit(q!(3))
1445    /// # }, |mut stream| async move {
1446    /// // 10, 20, 30
1447    /// # for w in vec![10, 20, 30] {
1448    /// #     assert_eq!(stream.next().await.unwrap(), w);
1449    /// # }
1450    /// # }));
1451    /// # }
1452    /// ```
1453    pub fn limit(
1454        self,
1455        n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1456    ) -> Stream<T, L, B, TotalOrder, ExactlyOnce>
1457    where
1458        O: IsOrdered,
1459        R: IsExactlyOnce,
1460    {
1461        self.generator(
1462            q!(|| 0usize),
1463            q!(move |count, item| {
1464                if *count == n {
1465                    Generate::Break
1466                } else {
1467                    *count += 1;
1468                    if *count == n {
1469                        Generate::Return(item)
1470                    } else {
1471                        Generate::Yield(item)
1472                    }
1473                }
1474            }),
1475        )
1476    }
1477
1478    /// Collects all the elements of this stream into a single [`Vec`] element.
1479    ///
1480    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1481    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1482    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1483    /// the vector at an arbitrary point in time.
1484    ///
1485    /// # Example
1486    /// ```rust
1487    /// # #[cfg(feature = "deploy")] {
1488    /// # use hydro_lang::prelude::*;
1489    /// # use futures::StreamExt;
1490    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1491    /// let tick = process.tick();
1492    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1493    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1494    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1495    /// # }, |mut stream| async move {
1496    /// // [ vec![1, 2, 3, 4] ]
1497    /// # for w in vec![vec![1, 2, 3, 4]] {
1498    /// #     assert_eq!(stream.next().await.unwrap(), w);
1499    /// # }
1500    /// # }));
1501    /// # }
1502    /// ```
1503    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1504    where
1505        O: IsOrdered,
1506        R: IsExactlyOnce,
1507    {
1508        self.make_totally_ordered().make_exactly_once().fold(
1509            q!(|| vec![]),
1510            q!(|acc, v| {
1511                acc.push(v);
1512            }),
1513        )
1514    }
1515
1516    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1517    /// and emitting each intermediate result.
1518    ///
1519    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1520    /// containing all intermediate accumulated values. The scan operation can also terminate early
1521    /// by returning `None`.
1522    ///
1523    /// The function takes a mutable reference to the accumulator and the current element, and returns
1524    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1525    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1526    ///
1527    /// # Examples
1528    ///
1529    /// Basic usage - running sum:
1530    /// ```rust
1531    /// # #[cfg(feature = "deploy")] {
1532    /// # use hydro_lang::prelude::*;
1533    /// # use futures::StreamExt;
1534    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1535    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1536    ///     q!(|| 0),
1537    ///     q!(|acc, x| {
1538    ///         *acc += x;
1539    ///         Some(*acc)
1540    ///     }),
1541    /// )
1542    /// # }, |mut stream| async move {
1543    /// // Output: 1, 3, 6, 10
1544    /// # for w in vec![1, 3, 6, 10] {
1545    /// #     assert_eq!(stream.next().await.unwrap(), w);
1546    /// # }
1547    /// # }));
1548    /// # }
1549    /// ```
1550    ///
1551    /// Early termination example:
1552    /// ```rust
1553    /// # #[cfg(feature = "deploy")] {
1554    /// # use hydro_lang::prelude::*;
1555    /// # use futures::StreamExt;
1556    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1557    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1558    ///     q!(|| 1),
1559    ///     q!(|state, x| {
1560    ///         *state = *state * x;
1561    ///         if *state > 6 {
1562    ///             None // Terminate the stream
1563    ///         } else {
1564    ///             Some(-*state)
1565    ///         }
1566    ///     }),
1567    /// )
1568    /// # }, |mut stream| async move {
1569    /// // Output: -1, -2, -6
1570    /// # for w in vec![-1, -2, -6] {
1571    /// #     assert_eq!(stream.next().await.unwrap(), w);
1572    /// # }
1573    /// # }));
1574    /// # }
1575    /// ```
1576    pub fn scan<A, U, I, F>(
1577        self,
1578        init: impl IntoQuotedMut<'a, I, L>,
1579        f: impl IntoQuotedMut<'a, F, L>,
1580    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1581    where
1582        O: IsOrdered,
1583        R: IsExactlyOnce,
1584        I: Fn() -> A + 'a,
1585        F: Fn(&mut A, T) -> Option<U> + 'a,
1586    {
1587        let init = init.splice_fn0_ctx(&self.location).into();
1588        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1589
1590        Stream::new(
1591            self.location.clone(),
1592            HydroNode::Scan {
1593                init,
1594                acc: f,
1595                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1596                metadata: self.location.new_node_metadata(
1597                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1598                ),
1599            },
1600        )
1601    }
1602
1603    /// Async version of [`Stream::scan`]. Applies an async function to each element of the
1604    /// stream, maintaining an internal state (accumulator) and emitting the values returned
1605    /// by the function.
1606    ///
1607    /// The closure runs synchronously (so it can mutate the accumulator), then returns a
1608    /// future. The future is polled to completion. If it resolves to `Some`, the value is
1609    /// emitted. If it resolves to `None`, the item is filtered out.
1610    ///
1611    /// # Examples
1612    ///
1613    /// ```rust
1614    /// # #[cfg(feature = "deploy")] {
1615    /// # use hydro_lang::prelude::*;
1616    /// # use futures::StreamExt;
1617    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1618    /// process
1619    ///     .source_iter(q!(vec![1, 2, 3, 4]))
1620    ///     .scan_async_blocking(
1621    ///         q!(|| 0),
1622    ///         q!(|acc, x| {
1623    ///             *acc += x;
1624    ///             let val = *acc;
1625    ///             async move { Some(val) }
1626    ///         }),
1627    ///     )
1628    /// # }, |mut stream| async move {
1629    /// // Output: 1, 3, 6, 10
1630    /// # for w in vec![1, 3, 6, 10] {
1631    /// #     assert_eq!(stream.next().await.unwrap(), w);
1632    /// # }
1633    /// # }));
1634    /// # }
1635    /// ```
1636    pub fn scan_async_blocking<A, U, I, F, Fut>(
1637        self,
1638        init: impl IntoQuotedMut<'a, I, L>,
1639        f: impl IntoQuotedMut<'a, F, L>,
1640    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1641    where
1642        O: IsOrdered,
1643        R: IsExactlyOnce,
1644        I: Fn() -> A + 'a,
1645        F: Fn(&mut A, T) -> Fut + 'a,
1646        Fut: Future<Output = Option<U>> + 'a,
1647    {
1648        let init = init.splice_fn0_ctx(&self.location).into();
1649        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1650
1651        Stream::new(
1652            self.location.clone(),
1653            HydroNode::ScanAsyncBlocking {
1654                init,
1655                acc: f,
1656                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1657                metadata: self.location.new_node_metadata(
1658                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1659                ),
1660            },
1661        )
1662    }
1663
1664    /// Iteratively processes the elements of the stream using a state machine that can yield
1665    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1666    /// syntax in Rust, without requiring special syntax.
1667    ///
1668    /// Like [`Stream::scan`], this function takes in an initializer that emits the initial
1669    /// state. The second argument defines the processing logic, taking in a mutable reference
1670    /// to the state and the value to be processed. It emits a [`Generate`] value, whose
1671    /// variants define what is emitted and whether further inputs should be processed.
1672    ///
1673    /// # Example
1674    /// ```rust
1675    /// # #[cfg(feature = "deploy")] {
1676    /// # use hydro_lang::prelude::*;
1677    /// # use futures::StreamExt;
1678    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1679    /// process.source_iter(q!(vec![1, 3, 100, 10])).generator(
1680    ///     q!(|| 0),
1681    ///     q!(|acc, x| {
1682    ///         *acc += x;
1683    ///         if *acc > 100 {
1684    ///             hydro_lang::live_collections::keyed_stream::Generate::Return("done!".to_owned())
1685    ///         } else if *acc % 2 == 0 {
1686    ///             hydro_lang::live_collections::keyed_stream::Generate::Yield("even".to_owned())
1687    ///         } else {
1688    ///             hydro_lang::live_collections::keyed_stream::Generate::Continue
1689    ///         }
1690    ///     }),
1691    /// )
1692    /// # }, |mut stream| async move {
1693    /// // Output: "even", "done!"
1694    /// # let mut results = Vec::new();
1695    /// # for _ in 0..2 {
1696    /// #     results.push(stream.next().await.unwrap());
1697    /// # }
1698    /// # results.sort();
1699    /// # assert_eq!(results, vec!["done!".to_owned(), "even".to_owned()]);
1700    /// # }));
1701    /// # }
1702    /// ```
1703    pub fn generator<A, U, I, F>(
1704        self,
1705        init: impl IntoQuotedMut<'a, I, L> + Copy,
1706        f: impl IntoQuotedMut<'a, F, L> + Copy,
1707    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1708    where
1709        O: IsOrdered,
1710        R: IsExactlyOnce,
1711        I: Fn() -> A + 'a,
1712        F: Fn(&mut A, T) -> Generate<U> + 'a,
1713    {
1714        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1715        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1716
1717        let this = self.make_totally_ordered().make_exactly_once();
1718
1719        // State is Option<Option<A>>:
1720        //   None = not yet initialized
1721        //   Some(Some(a)) = active with state a
1722        //   Some(None) = terminated
1723        let scan_init = q!(|| None)
1724            .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1725            .into();
1726        let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1727            if state.is_none() {
1728                *state = Some(Some(init()));
1729            }
1730            match state {
1731                Some(Some(state_value)) => match f(state_value, v) {
1732                    Generate::Yield(out) => Some(Some(out)),
1733                    Generate::Return(out) => {
1734                        *state = Some(None);
1735                        Some(Some(out))
1736                    }
1737                    // Unlike KeyedStream, we can terminate the scan directly on
1738                    // Break/Return because there is only one state (no other keys
1739                    // that still need processing).
1740                    Generate::Break => None,
1741                    Generate::Continue => Some(None),
1742                },
1743                // State is Some(None) after Return; terminate the scan.
1744                _ => None,
1745            }
1746        })
1747        .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1748        .into();
1749
1750        let scan_node = HydroNode::Scan {
1751            init: scan_init,
1752            acc: scan_f,
1753            input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1754            metadata: this.location.new_node_metadata(Stream::<
1755                Option<U>,
1756                L,
1757                B,
1758                TotalOrder,
1759                ExactlyOnce,
1760            >::collection_kind()),
1761        };
1762
1763        let flatten_f = q!(|d| d)
1764            .splice_fn1_ctx::<Option<U>, _>(&this.location)
1765            .into();
1766        let flatten_node = HydroNode::FlatMap {
1767            f: flatten_f,
1768            input: Box::new(scan_node),
1769            metadata: this
1770                .location
1771                .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1772        };
1773
1774        Stream::new(this.location.clone(), flatten_node)
1775    }
1776
1777    /// Given a time interval, returns a stream corresponding to samples taken from the
1778    /// stream roughly at that interval. The output will have elements in the same order
1779    /// as the input, but with arbitrary elements skipped between samples. There is also
1780    /// no guarantee on the exact timing of the samples.
1781    ///
1782    /// # Non-Determinism
1783    /// The output stream is non-deterministic in which elements are sampled, since this
1784    /// is controlled by a clock.
1785    pub fn sample_every(
1786        self,
1787        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1788        nondet: NonDet,
1789    ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
1790    where
1791        L: NoTick + NoAtomic,
1792    {
1793        let samples = self.location.source_interval(interval, nondet);
1794
1795        let tick = self.location.tick();
1796        self.batch(&tick, nondet)
1797            .filter_if(samples.batch(&tick, nondet).first().is_some())
1798            .all_ticks()
1799            .weaken_retries()
1800    }
1801
1802    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
1803    /// stream has not emitted a value since that duration.
1804    ///
1805    /// # Non-Determinism
1806    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1807    /// samples take place, timeouts may be non-deterministically generated or missed,
1808    /// and the notification of the timeout may be delayed as well. There is also no
1809    /// guarantee on how long the [`Optional`] will have a value after the timeout is
1810    /// detected based on when the next sample is taken.
1811    pub fn timeout(
1812        self,
1813        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
1814        nondet: NonDet,
1815    ) -> Optional<(), L, Unbounded>
1816    where
1817        L: NoTick + NoAtomic,
1818    {
1819        let tick = self.location.tick();
1820
1821        let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
1822            q!(|| None),
1823            q!(
1824                |latest, _| {
1825                    *latest = Some(Instant::now());
1826                },
1827                commutative = manual_proof!(/** TODO */)
1828            ),
1829        );
1830
1831        latest_received
1832            .snapshot(&tick, nondet)
1833            .filter_map(q!(move |latest_received| {
1834                if let Some(latest_received) = latest_received {
1835                    if Instant::now().duration_since(latest_received) > duration {
1836                        Some(())
1837                    } else {
1838                        None
1839                    }
1840                } else {
1841                    Some(())
1842                }
1843            }))
1844            .latest()
1845    }
1846
1847    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
1848    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
1849    ///
1850    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1851    /// processed before an acknowledgement is emitted.
1852    pub fn atomic(self) -> Stream<T, Atomic<L>, B, O, R> {
1853        let id = self.location.flow_state().borrow_mut().next_clock_id();
1854        let out_location = Atomic {
1855            tick: Tick {
1856                id,
1857                l: self.location.clone(),
1858            },
1859        };
1860        Stream::new(
1861            out_location.clone(),
1862            HydroNode::BeginAtomic {
1863                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1864                metadata: out_location
1865                    .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
1866            },
1867        )
1868    }
1869
1870    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1871    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1872    /// the order of the input. The output stream will execute in the [`Tick`] that was
1873    /// used to create the atomic section.
1874    ///
1875    /// # Non-Determinism
1876    /// The batch boundaries are non-deterministic and may change across executions.
1877    pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
1878        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1879        Stream::new(
1880            tick.clone(),
1881            HydroNode::Batch {
1882                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1883                metadata: tick
1884                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
1885            },
1886        )
1887    }
1888
1889    /// An operator which allows you to "name" a `HydroNode`.
1890    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1891    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
1892        {
1893            let mut node = self.ir_node.borrow_mut();
1894            let metadata = node.metadata_mut();
1895            metadata.tag = Some(name.to_owned());
1896        }
1897        self
1898    }
1899
1900    /// Turns this [`Stream`] into a [`Optional`], under the invariant assumption that there is at
1901    /// most one element. If this invariant is broken, the program may exhibit undefined behavior,
1902    /// so uses must be carefully vetted.
1903    pub(crate) fn cast_at_most_one_element(self) -> Optional<T, L, B>
1904    where
1905        B: IsBounded,
1906    {
1907        Optional::new(
1908            self.location.clone(),
1909            HydroNode::Cast {
1910                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1911                metadata: self
1912                    .location
1913                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1914            },
1915        )
1916    }
1917
1918    pub(crate) fn use_ordering_type<O2: Ordering>(self) -> Stream<T, L, B, O2, R> {
1919        if O::ORDERING_KIND == O2::ORDERING_KIND {
1920            Stream::new(
1921                self.location.clone(),
1922                self.ir_node.replace(HydroNode::Placeholder),
1923            )
1924        } else {
1925            panic!(
1926                "Runtime ordering {:?} did not match requested cast {:?}.",
1927                O::ORDERING_KIND,
1928                O2::ORDERING_KIND
1929            )
1930        }
1931    }
1932
1933    /// Explicitly "casts" the stream to a type with a different ordering
1934    /// guarantee. Useful in unsafe code where the ordering cannot be proven
1935    /// by the type-system.
1936    ///
1937    /// # Non-Determinism
1938    /// This function is used as an escape hatch, and any mistakes in the
1939    /// provided ordering guarantee will propagate into the guarantees
1940    /// for the rest of the program.
1941    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
1942        if O::ORDERING_KIND == O2::ORDERING_KIND {
1943            self.use_ordering_type()
1944        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1945            // We can always weaken the ordering guarantee
1946            Stream::new(
1947                self.location.clone(),
1948                HydroNode::Cast {
1949                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1950                    metadata: self
1951                        .location
1952                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1953                },
1954            )
1955        } else {
1956            Stream::new(
1957                self.location.clone(),
1958                HydroNode::ObserveNonDet {
1959                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1960                    trusted: false,
1961                    metadata: self
1962                        .location
1963                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1964                },
1965            )
1966        }
1967    }
1968
1969    // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
1970    // intermediate states will not be revealed
1971    fn assume_ordering_trusted_bounded<O2: Ordering>(
1972        self,
1973        nondet: NonDet,
1974    ) -> Stream<T, L, B, O2, R> {
1975        if B::BOUNDED {
1976            self.assume_ordering_trusted(nondet)
1977        } else {
1978            self.assume_ordering(nondet)
1979        }
1980    }
1981
1982    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1983    // is not observable
1984    pub(crate) fn assume_ordering_trusted<O2: Ordering>(
1985        self,
1986        _nondet: NonDet,
1987    ) -> Stream<T, L, B, O2, R> {
1988        if O::ORDERING_KIND == O2::ORDERING_KIND {
1989            Stream::new(
1990                self.location.clone(),
1991                self.ir_node.replace(HydroNode::Placeholder),
1992            )
1993        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1994            // We can always weaken the ordering guarantee
1995            Stream::new(
1996                self.location.clone(),
1997                HydroNode::Cast {
1998                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1999                    metadata: self
2000                        .location
2001                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2002                },
2003            )
2004        } else {
2005            Stream::new(
2006                self.location.clone(),
2007                HydroNode::ObserveNonDet {
2008                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2009                    trusted: true,
2010                    metadata: self
2011                        .location
2012                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2013                },
2014            )
2015        }
2016    }
2017
2018    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
2019    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
2020    /// which is always safe because that is the weakest possible guarantee.
2021    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
2022        self.weaken_ordering::<NoOrder>()
2023    }
2024
2025    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
2026    /// enforcing that `O2` is weaker than the input ordering guarantee.
2027    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
2028        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
2029        self.assume_ordering::<O2>(nondet)
2030    }
2031
2032    /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
2033    /// implies that `O == TotalOrder`.
2034    pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
2035    where
2036        O: IsOrdered,
2037    {
2038        self.assume_ordering(nondet!(/** no-op */))
2039    }
2040
2041    /// Explicitly "casts" the stream to a type with a different retries
2042    /// guarantee. Useful in unsafe code where the lack of retries cannot
2043    /// be proven by the type-system.
2044    ///
2045    /// # Non-Determinism
2046    /// This function is used as an escape hatch, and any mistakes in the
2047    /// provided retries guarantee will propagate into the guarantees
2048    /// for the rest of the program.
2049    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
2050        if R::RETRIES_KIND == R2::RETRIES_KIND {
2051            Stream::new(
2052                self.location.clone(),
2053                self.ir_node.replace(HydroNode::Placeholder),
2054            )
2055        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2056            // We can always weaken the retries guarantee
2057            Stream::new(
2058                self.location.clone(),
2059                HydroNode::Cast {
2060                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2061                    metadata: self
2062                        .location
2063                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2064                },
2065            )
2066        } else {
2067            Stream::new(
2068                self.location.clone(),
2069                HydroNode::ObserveNonDet {
2070                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2071                    trusted: false,
2072                    metadata: self
2073                        .location
2074                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2075                },
2076            )
2077        }
2078    }
2079
2080    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
2081    // is not observable
2082    fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
2083        if R::RETRIES_KIND == R2::RETRIES_KIND {
2084            Stream::new(
2085                self.location.clone(),
2086                self.ir_node.replace(HydroNode::Placeholder),
2087            )
2088        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2089            // We can always weaken the retries guarantee
2090            Stream::new(
2091                self.location.clone(),
2092                HydroNode::Cast {
2093                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2094                    metadata: self
2095                        .location
2096                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2097                },
2098            )
2099        } else {
2100            Stream::new(
2101                self.location.clone(),
2102                HydroNode::ObserveNonDet {
2103                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2104                    trusted: true,
2105                    metadata: self
2106                        .location
2107                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2108                },
2109            )
2110        }
2111    }
2112
2113    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
2114    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
2115    /// which is always safe because that is the weakest possible guarantee.
2116    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
2117        self.weaken_retries::<AtLeastOnce>()
2118    }
2119
2120    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
2121    /// enforcing that `R2` is weaker than the input retries guarantee.
2122    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
2123        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
2124        self.assume_retries::<R2>(nondet)
2125    }
2126
2127    /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
2128    /// implies that `R == ExactlyOnce`.
2129    pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
2130    where
2131        R: IsExactlyOnce,
2132    {
2133        self.assume_retries(nondet!(/** no-op */))
2134    }
2135
2136    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
2137    /// implies that `B == Bounded`.
2138    pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
2139    where
2140        B: IsBounded,
2141    {
2142        self.weaken_boundedness()
2143    }
2144
2145    /// Weakens the boundedness guarantee to an arbitrary boundedness `B2`, given that `B: IsBounded`,
2146    /// which implies that `B == Bounded`.
2147    pub fn weaken_boundedness<B2: Boundedness>(self) -> Stream<T, L, B2, O, R> {
2148        if B::BOUNDED == B2::BOUNDED {
2149            Stream::new(
2150                self.location.clone(),
2151                self.ir_node.replace(HydroNode::Placeholder),
2152            )
2153        } else {
2154            // We can always weaken the boundedness
2155            Stream::new(
2156                self.location.clone(),
2157                HydroNode::Cast {
2158                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2159                    metadata: self
2160                        .location
2161                        .new_node_metadata(Stream::<T, L, B2, O, R>::collection_kind()),
2162                },
2163            )
2164        }
2165    }
2166}
2167
2168impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
2169where
2170    L: Location<'a>,
2171{
2172    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
2173    ///
2174    /// # Example
2175    /// ```rust
2176    /// # #[cfg(feature = "deploy")] {
2177    /// # use hydro_lang::prelude::*;
2178    /// # use futures::StreamExt;
2179    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2180    /// process.source_iter(q!(&[1, 2, 3])).cloned()
2181    /// # }, |mut stream| async move {
2182    /// // 1, 2, 3
2183    /// # for w in vec![1, 2, 3] {
2184    /// #     assert_eq!(stream.next().await.unwrap(), w);
2185    /// # }
2186    /// # }));
2187    /// # }
2188    /// ```
2189    pub fn cloned(self) -> Stream<T, L, B, O, R>
2190    where
2191        T: Clone,
2192    {
2193        self.map(q!(|d| d.clone()))
2194    }
2195}
2196
2197impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2198where
2199    L: Location<'a>,
2200{
2201    /// Computes the number of elements in the stream as a [`Singleton`].
2202    ///
2203    /// # Example
2204    /// ```rust
2205    /// # #[cfg(feature = "deploy")] {
2206    /// # use hydro_lang::prelude::*;
2207    /// # use futures::StreamExt;
2208    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2209    /// let tick = process.tick();
2210    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2211    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2212    /// batch.count().all_ticks()
2213    /// # }, |mut stream| async move {
2214    /// // 4
2215    /// # assert_eq!(stream.next().await.unwrap(), 4);
2216    /// # }));
2217    /// # }
2218    /// ```
2219    pub fn count(self) -> Singleton<usize, L, B::StreamToMonotone> {
2220        self.assume_ordering_trusted::<TotalOrder>(nondet!(
2221            /// Order does not affect eventual count, and also does not affect intermediate states.
2222        ))
2223        .fold(
2224            q!(|| 0usize),
2225            q!(
2226                |count, _| *count += 1,
2227                monotone = manual_proof!(/** += 1 is monotone */)
2228            ),
2229        )
2230    }
2231}
2232
2233impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2234    /// Produces a new stream that merges the elements of the two input streams.
2235    /// The result has [`NoOrder`] because the order of merging is not guaranteed.
2236    ///
2237    /// Currently, both input streams must be [`Unbounded`]. When the streams are
2238    /// [`Bounded`], you can use [`Stream::chain`] instead.
2239    ///
2240    /// # Example
2241    /// ```rust
2242    /// # #[cfg(feature = "deploy")] {
2243    /// # use hydro_lang::prelude::*;
2244    /// # use futures::StreamExt;
2245    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2246    /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
2247    /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
2248    /// numbers.clone().map(q!(|x| x + 1)).merge_unordered(numbers)
2249    /// # }, |mut stream| async move {
2250    /// // 2, 3, 4, 5, and 1, 2, 3, 4 merged in unknown order
2251    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2252    /// #     assert_eq!(stream.next().await.unwrap(), w);
2253    /// # }
2254    /// # }));
2255    /// # }
2256    /// ```
2257    pub fn merge_unordered<O2: Ordering, R2: Retries>(
2258        self,
2259        other: Stream<T, L, Unbounded, O2, R2>,
2260    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2261    where
2262        R: MinRetries<R2>,
2263    {
2264        Stream::new(
2265            self.location.clone(),
2266            HydroNode::Chain {
2267                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2268                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2269                metadata: self.location.new_node_metadata(Stream::<
2270                    T,
2271                    L,
2272                    Unbounded,
2273                    NoOrder,
2274                    <R as MinRetries<R2>>::Min,
2275                >::collection_kind()),
2276            },
2277        )
2278    }
2279
2280    /// Deprecated: use [`Stream::merge_unordered`] instead.
2281    #[deprecated(note = "use `merge_unordered` instead")]
2282    pub fn interleave<O2: Ordering, R2: Retries>(
2283        self,
2284        other: Stream<T, L, Unbounded, O2, R2>,
2285    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2286    where
2287        R: MinRetries<R2>,
2288    {
2289        self.merge_unordered(other)
2290    }
2291}
2292
2293impl<'a, T, L: Location<'a>, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R> {
2294    /// Produces a new stream that combines the elements of the two input streams,
2295    /// preserving the relative order of elements within each input.
2296    ///
2297    /// # Non-Determinism
2298    /// The order in which elements *across* the two streams will be interleaved is
2299    /// non-deterministic, so the order of elements will vary across runs. If the output
2300    /// order is irrelevant, use [`Stream::merge_unordered`] instead, which is deterministic
2301    /// but emits an unordered stream. For deterministic first-then-second ordering on
2302    /// bounded streams, use [`Stream::chain`].
2303    ///
2304    /// # Example
2305    /// ```rust
2306    /// # #[cfg(feature = "deploy")] {
2307    /// # use hydro_lang::prelude::*;
2308    /// # use futures::StreamExt;
2309    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2310    /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
2311    /// # process.source_iter(q!(vec![1, 3])).into();
2312    /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
2313    /// # }, |mut stream| async move {
2314    /// // 1, 3 and 2, 4 in some order, preserving the original local order
2315    /// # for w in vec![1, 3, 2, 4] {
2316    /// #     assert_eq!(stream.next().await.unwrap(), w);
2317    /// # }
2318    /// # }));
2319    /// # }
2320    /// ```
2321    pub fn merge_ordered<R2: Retries>(
2322        self,
2323        other: Stream<T, L, B, TotalOrder, R2>,
2324        _nondet: NonDet,
2325    ) -> Stream<T, L, B, TotalOrder, <R as MinRetries<R2>>::Min>
2326    where
2327        R: MinRetries<R2>,
2328    {
2329        Stream::new(
2330            self.location.clone(),
2331            HydroNode::MergeOrdered {
2332                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2333                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2334                metadata: self.location.new_node_metadata(Stream::<
2335                    T,
2336                    L,
2337                    B,
2338                    TotalOrder,
2339                    <R as MinRetries<R2>>::Min,
2340                >::collection_kind()),
2341            },
2342        )
2343    }
2344}
2345
2346impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2347where
2348    L: Location<'a>,
2349{
2350    /// Produces a new stream that emits the input elements in sorted order.
2351    ///
2352    /// The input stream can have any ordering guarantee, but the output stream
2353    /// will have a [`TotalOrder`] guarantee. This operator will block until all
2354    /// elements in the input stream are available, so it requires the input stream
2355    /// to be [`Bounded`].
2356    ///
2357    /// # Example
2358    /// ```rust
2359    /// # #[cfg(feature = "deploy")] {
2360    /// # use hydro_lang::prelude::*;
2361    /// # use futures::StreamExt;
2362    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2363    /// let tick = process.tick();
2364    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
2365    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2366    /// batch.sort().all_ticks()
2367    /// # }, |mut stream| async move {
2368    /// // 1, 2, 3, 4
2369    /// # for w in (1..5) {
2370    /// #     assert_eq!(stream.next().await.unwrap(), w);
2371    /// # }
2372    /// # }));
2373    /// # }
2374    /// ```
2375    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2376    where
2377        B: IsBounded,
2378        T: Ord,
2379    {
2380        let this = self.make_bounded();
2381        Stream::new(
2382            this.location.clone(),
2383            HydroNode::Sort {
2384                input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2385                metadata: this
2386                    .location
2387                    .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2388            },
2389        )
2390    }
2391
2392    /// Produces a new stream that first emits the elements of the `self` stream,
2393    /// and then emits the elements of the `other` stream. The output stream has
2394    /// a [`TotalOrder`] guarantee if and only if both input streams have a
2395    /// [`TotalOrder`] guarantee.
2396    ///
2397    /// Currently, both input streams must be [`Bounded`]. This operator will block
2398    /// on the first stream until all its elements are available. In a future version,
2399    /// we will relax the requirement on the `other` stream.
2400    ///
2401    /// # Example
2402    /// ```rust
2403    /// # #[cfg(feature = "deploy")] {
2404    /// # use hydro_lang::prelude::*;
2405    /// # use futures::StreamExt;
2406    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2407    /// let tick = process.tick();
2408    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2409    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2410    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2411    /// # }, |mut stream| async move {
2412    /// // 2, 3, 4, 5, 1, 2, 3, 4
2413    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2414    /// #     assert_eq!(stream.next().await.unwrap(), w);
2415    /// # }
2416    /// # }));
2417    /// # }
2418    /// ```
2419    pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2420        self,
2421        other: Stream<T, L, B2, O2, R2>,
2422    ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2423    where
2424        B: IsBounded,
2425        O: MinOrder<O2>,
2426        R: MinRetries<R2>,
2427    {
2428        check_matching_location(&self.location, &other.location);
2429
2430        Stream::new(
2431            self.location.clone(),
2432            HydroNode::Chain {
2433                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2434                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2435                metadata: self.location.new_node_metadata(Stream::<
2436                    T,
2437                    L,
2438                    B2,
2439                    <O as MinOrder<O2>>::Min,
2440                    <R as MinRetries<R2>>::Min,
2441                >::collection_kind()),
2442            },
2443        )
2444    }
2445
2446    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
2447    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
2448    /// because this is compiled into a nested loop.
2449    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
2450        self,
2451        other: Stream<T2, L, Bounded, O2, R>,
2452    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
2453    where
2454        B: IsBounded,
2455        T: Clone,
2456        T2: Clone,
2457    {
2458        let this = self.make_bounded();
2459        check_matching_location(&this.location, &other.location);
2460
2461        Stream::new(
2462            this.location.clone(),
2463            HydroNode::CrossProduct {
2464                left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2465                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2466                metadata: this.location.new_node_metadata(Stream::<
2467                    (T, T2),
2468                    L,
2469                    Bounded,
2470                    <O2 as MinOrder<O>>::Min,
2471                    R,
2472                >::collection_kind()),
2473            },
2474        )
2475    }
2476
2477    /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2478    /// `self` used as the values for *each* key.
2479    ///
2480    /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2481    /// values. For example, it can be used to send the same set of elements to several cluster
2482    /// members, if the membership information is available as a [`KeyedSingleton`].
2483    ///
2484    /// # Example
2485    /// ```rust
2486    /// # #[cfg(feature = "deploy")] {
2487    /// # use hydro_lang::prelude::*;
2488    /// # use futures::StreamExt;
2489    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2490    /// # let tick = process.tick();
2491    /// let keyed_singleton = // { 1: (), 2: () }
2492    /// # process
2493    /// #     .source_iter(q!(vec![(1, ()), (2, ())]))
2494    /// #     .into_keyed()
2495    /// #     .batch(&tick, nondet!(/** test */))
2496    /// #     .first();
2497    /// let stream = // [ "a", "b" ]
2498    /// # process
2499    /// #     .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2500    /// #     .batch(&tick, nondet!(/** test */));
2501    /// stream.repeat_with_keys(keyed_singleton)
2502    /// # .entries().all_ticks()
2503    /// # }, |mut stream| async move {
2504    /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2505    /// # let mut results = Vec::new();
2506    /// # for _ in 0..4 {
2507    /// #     results.push(stream.next().await.unwrap());
2508    /// # }
2509    /// # results.sort();
2510    /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2511    /// # }));
2512    /// # }
2513    /// ```
2514    pub fn repeat_with_keys<K, V2>(
2515        self,
2516        keys: KeyedSingleton<K, V2, L, Bounded>,
2517    ) -> KeyedStream<K, T, L, Bounded, O, R>
2518    where
2519        B: IsBounded,
2520        K: Clone,
2521        T: Clone,
2522    {
2523        keys.keys()
2524            .weaken_retries()
2525            .assume_ordering_trusted::<TotalOrder>(
2526                nondet!(/** keyed stream does not depend on ordering of keys */),
2527            )
2528            .cross_product_nested_loop(self.make_bounded())
2529            .into_keyed()
2530    }
2531
2532    /// Consumes a stream of `Future<T>`, resolving each future while blocking subgraph
2533    /// execution until all results are available. The output order is based on when futures
2534    /// complete, and may be different than the input order.
2535    ///
2536    /// Unlike [`Stream::resolve_futures`], which allows the subgraph to continue executing
2537    /// while futures are pending, this variant blocks until the futures resolve.
2538    ///
2539    /// # Example
2540    /// ```rust
2541    /// # #[cfg(feature = "deploy")] {
2542    /// # use std::collections::HashSet;
2543    /// # use futures::StreamExt;
2544    /// # use hydro_lang::prelude::*;
2545    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2546    /// process
2547    ///     .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2548    ///     .map(q!(|x| async move {
2549    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2550    ///         x
2551    ///     }))
2552    ///     .resolve_futures_blocking()
2553    /// #   },
2554    /// #   |mut stream| async move {
2555    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2556    /// #       let mut output = HashSet::new();
2557    /// #       for _ in 1..10 {
2558    /// #           output.insert(stream.next().await.unwrap());
2559    /// #       }
2560    /// #       assert_eq!(
2561    /// #           output,
2562    /// #           HashSet::<i32>::from_iter(1..10)
2563    /// #       );
2564    /// #   },
2565    /// # ));
2566    /// # }
2567    /// ```
2568    pub fn resolve_futures_blocking(self) -> Stream<T::Output, L, B, NoOrder, R>
2569    where
2570        T: Future,
2571    {
2572        Stream::new(
2573            self.location.clone(),
2574            HydroNode::ResolveFuturesBlocking {
2575                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2576                metadata: self
2577                    .location
2578                    .new_node_metadata(Stream::<T::Output, L, B, NoOrder, R>::collection_kind()),
2579            },
2580        )
2581    }
2582
2583    /// Returns a [`Singleton`] containing `true` if the stream has no elements, or `false` otherwise.
2584    ///
2585    /// # Example
2586    /// ```rust
2587    /// # #[cfg(feature = "deploy")] {
2588    /// # use hydro_lang::prelude::*;
2589    /// # use futures::StreamExt;
2590    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2591    /// let tick = process.tick();
2592    /// let empty: Stream<i32, _, Bounded> = process
2593    ///   .source_iter(q!(Vec::<i32>::new()))
2594    ///   .batch(&tick, nondet!(/** test */));
2595    /// empty.is_empty().all_ticks()
2596    /// # }, |mut stream| async move {
2597    /// // true
2598    /// # assert_eq!(stream.next().await.unwrap(), true);
2599    /// # }));
2600    /// # }
2601    /// ```
2602    #[expect(clippy::wrong_self_convention, reason = "stream function naming")]
2603    pub fn is_empty(self) -> Singleton<bool, L, Bounded>
2604    where
2605        B: IsBounded,
2606    {
2607        self.make_bounded()
2608            .assume_ordering_trusted::<TotalOrder>(
2609                nondet!(/** is_empty intermediates unaffected by order */),
2610            )
2611            .first()
2612            .is_none()
2613    }
2614}
2615
2616impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2617where
2618    L: Location<'a>,
2619{
2620    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2621    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2622    /// by equi-joining the two streams on the key attribute `K`.
2623    ///
2624    /// When the right-hand side is [`Bounded`], the join accumulates the right side first
2625    /// and streams the left side through, preserving the left side's ordering. When both
2626    /// sides are [`Unbounded`], a symmetric hash join is used and ordering is [`NoOrder`].
2627    ///
2628    /// # Example
2629    /// ```rust
2630    /// # #[cfg(feature = "deploy")] {
2631    /// # use hydro_lang::prelude::*;
2632    /// # use std::collections::HashSet;
2633    /// # use futures::StreamExt;
2634    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2635    /// let tick = process.tick();
2636    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2637    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2638    /// stream1.join(stream2)
2639    /// # }, |mut stream| async move {
2640    /// // (1, ('a', 'x')), (2, ('b', 'y'))
2641    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2642    /// # stream.map(|i| assert!(expected.contains(&i)));
2643    /// # }));
2644    /// # }
2645    pub fn join<V2, B2: Boundedness, O2: Ordering, R2: Retries>(
2646        self,
2647        n: Stream<(K, V2), L, B2, O2, R2>,
2648    ) -> Stream<(K, (V1, V2)), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
2649    where
2650        K: Eq + Hash + Clone,
2651        R: MinRetries<R2>,
2652        V1: Clone,
2653        V2: Clone,
2654    {
2655        check_matching_location(&self.location, &n.location);
2656
2657        let ir_node = if B2::BOUNDED {
2658            HydroNode::JoinHalf {
2659                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2660                right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2661                metadata: self.location.new_node_metadata(Stream::<
2662                    (K, (V1, V2)),
2663                    L,
2664                    B,
2665                    B2::PreserveOrderIfBounded<O>,
2666                    <R as MinRetries<R2>>::Min,
2667                >::collection_kind()),
2668            }
2669        } else {
2670            HydroNode::Join {
2671                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2672                right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2673                metadata: self.location.new_node_metadata(Stream::<
2674                    (K, (V1, V2)),
2675                    L,
2676                    B,
2677                    B2::PreserveOrderIfBounded<O>,
2678                    <R as MinRetries<R2>>::Min,
2679                >::collection_kind()),
2680            }
2681        };
2682
2683        Stream::new(self.location.clone(), ir_node)
2684    }
2685
2686    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2687    /// computes the anti-join of the items in the input -- i.e. returns
2688    /// unique items in the first input that do not have a matching key
2689    /// in the second input.
2690    ///
2691    /// # Example
2692    /// ```rust
2693    /// # #[cfg(feature = "deploy")] {
2694    /// # use hydro_lang::prelude::*;
2695    /// # use futures::StreamExt;
2696    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2697    /// let tick = process.tick();
2698    /// let stream = process
2699    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2700    ///   .batch(&tick, nondet!(/** test */));
2701    /// let batch = process
2702    ///   .source_iter(q!(vec![1, 2]))
2703    ///   .batch(&tick, nondet!(/** test */));
2704    /// stream.anti_join(batch).all_ticks()
2705    /// # }, |mut stream| async move {
2706    /// # for w in vec![(3, 'c'), (4, 'd')] {
2707    /// #     assert_eq!(stream.next().await.unwrap(), w);
2708    /// # }
2709    /// # }));
2710    /// # }
2711    pub fn anti_join<O2: Ordering, R2: Retries>(
2712        self,
2713        n: Stream<K, L, Bounded, O2, R2>,
2714    ) -> Stream<(K, V1), L, B, O, R>
2715    where
2716        K: Eq + Hash,
2717    {
2718        check_matching_location(&self.location, &n.location);
2719
2720        Stream::new(
2721            self.location.clone(),
2722            HydroNode::AntiJoin {
2723                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2724                neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2725                metadata: self
2726                    .location
2727                    .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2728            },
2729        )
2730    }
2731}
2732
2733impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2734    Stream<(K, V), L, B, O, R>
2735{
2736    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2737    /// is used as the key and the second element is added to the entries associated with that key.
2738    ///
2739    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2740    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2741    /// performing grouped aggregations, but also for more precise ordering guarantees such as
2742    /// total ordering _within_ each group but no ordering _across_ groups.
2743    ///
2744    /// # Example
2745    /// ```rust
2746    /// # #[cfg(feature = "deploy")] {
2747    /// # use hydro_lang::prelude::*;
2748    /// # use futures::StreamExt;
2749    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2750    /// process
2751    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2752    ///     .into_keyed()
2753    /// #   .entries()
2754    /// # }, |mut stream| async move {
2755    /// // { 1: [2, 3], 2: [4] }
2756    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2757    /// #     assert_eq!(stream.next().await.unwrap(), w);
2758    /// # }
2759    /// # }));
2760    /// # }
2761    /// ```
2762    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2763        KeyedStream::new(
2764            self.location.clone(),
2765            HydroNode::Cast {
2766                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2767                metadata: self
2768                    .location
2769                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2770            },
2771        )
2772    }
2773}
2774
2775impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2776where
2777    K: Eq + Hash,
2778    L: Location<'a>,
2779{
2780    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2781    /// # Example
2782    /// ```rust
2783    /// # #[cfg(feature = "deploy")] {
2784    /// # use hydro_lang::prelude::*;
2785    /// # use futures::StreamExt;
2786    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2787    /// let tick = process.tick();
2788    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2789    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2790    /// batch.keys().all_ticks()
2791    /// # }, |mut stream| async move {
2792    /// // 1, 2
2793    /// # assert_eq!(stream.next().await.unwrap(), 1);
2794    /// # assert_eq!(stream.next().await.unwrap(), 2);
2795    /// # }));
2796    /// # }
2797    /// ```
2798    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2799        self.into_keyed()
2800            .fold(
2801                q!(|| ()),
2802                q!(
2803                    |_, _| {},
2804                    commutative = manual_proof!(/** values are ignored */),
2805                    idempotent = manual_proof!(/** values are ignored */)
2806                ),
2807            )
2808            .keys()
2809    }
2810}
2811
2812impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2813where
2814    L: Location<'a> + NoTick,
2815{
2816    /// Returns a stream corresponding to the latest batch of elements being atomically
2817    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2818    /// the order of the input.
2819    ///
2820    /// # Non-Determinism
2821    /// The batch boundaries are non-deterministic and may change across executions.
2822    pub fn batch_atomic(
2823        self,
2824        tick: &Tick<L>,
2825        _nondet: NonDet,
2826    ) -> Stream<T, Tick<L>, Bounded, O, R> {
2827        Stream::new(
2828            tick.clone(),
2829            HydroNode::Batch {
2830                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2831                metadata: tick
2832                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2833            },
2834        )
2835    }
2836
2837    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2838    /// See [`Stream::atomic`] for more details.
2839    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2840        Stream::new(
2841            self.location.tick.l.clone(),
2842            HydroNode::EndAtomic {
2843                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2844                metadata: self
2845                    .location
2846                    .tick
2847                    .l
2848                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2849            },
2850        )
2851    }
2852}
2853
2854impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2855where
2856    L: Location<'a> + NoTick + NoAtomic,
2857    F: Future<Output = T>,
2858{
2859    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2860    /// Future outputs are produced as available, regardless of input arrival order.
2861    ///
2862    /// # Example
2863    /// ```rust
2864    /// # #[cfg(feature = "deploy")] {
2865    /// # use std::collections::HashSet;
2866    /// # use futures::StreamExt;
2867    /// # use hydro_lang::prelude::*;
2868    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2869    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2870    ///     .map(q!(|x| async move {
2871    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2872    ///         x
2873    ///     }))
2874    ///     .resolve_futures()
2875    /// #   },
2876    /// #   |mut stream| async move {
2877    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2878    /// #       let mut output = HashSet::new();
2879    /// #       for _ in 1..10 {
2880    /// #           output.insert(stream.next().await.unwrap());
2881    /// #       }
2882    /// #       assert_eq!(
2883    /// #           output,
2884    /// #           HashSet::<i32>::from_iter(1..10)
2885    /// #       );
2886    /// #   },
2887    /// # ));
2888    /// # }
2889    pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
2890        Stream::new(
2891            self.location.clone(),
2892            HydroNode::ResolveFutures {
2893                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2894                metadata: self
2895                    .location
2896                    .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
2897            },
2898        )
2899    }
2900
2901    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2902    /// Future outputs are produced in the same order as the input stream.
2903    ///
2904    /// # Example
2905    /// ```rust
2906    /// # #[cfg(feature = "deploy")] {
2907    /// # use std::collections::HashSet;
2908    /// # use futures::StreamExt;
2909    /// # use hydro_lang::prelude::*;
2910    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2911    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2912    ///     .map(q!(|x| async move {
2913    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2914    ///         x
2915    ///     }))
2916    ///     .resolve_futures_ordered()
2917    /// #   },
2918    /// #   |mut stream| async move {
2919    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2920    /// #       let mut output = Vec::new();
2921    /// #       for _ in 1..10 {
2922    /// #           output.push(stream.next().await.unwrap());
2923    /// #       }
2924    /// #       assert_eq!(
2925    /// #           output,
2926    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2927    /// #       );
2928    /// #   },
2929    /// # ));
2930    /// # }
2931    pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
2932        Stream::new(
2933            self.location.clone(),
2934            HydroNode::ResolveFuturesOrdered {
2935                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2936                metadata: self
2937                    .location
2938                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2939            },
2940        )
2941    }
2942}
2943
2944impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2945where
2946    L: Location<'a>,
2947{
2948    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2949    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2950    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2951        Stream::new(
2952            self.location.outer().clone(),
2953            HydroNode::YieldConcat {
2954                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2955                metadata: self
2956                    .location
2957                    .outer()
2958                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2959            },
2960        )
2961    }
2962
2963    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2964    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2965    ///
2966    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2967    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2968    /// stream's [`Tick`] context.
2969    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2970        let out_location = Atomic {
2971            tick: self.location.clone(),
2972        };
2973
2974        Stream::new(
2975            out_location.clone(),
2976            HydroNode::YieldConcat {
2977                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2978                metadata: out_location
2979                    .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2980            },
2981        )
2982    }
2983
2984    /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
2985    /// such as `fold` retrain their memory across ticks rather than resetting across batches of
2986    /// input.
2987    ///
2988    /// This API is particularly useful for stateful computation on batches of data, such as
2989    /// maintaining an accumulated state that is up to date with the current batch.
2990    ///
2991    /// # Example
2992    /// ```rust
2993    /// # #[cfg(feature = "deploy")] {
2994    /// # use hydro_lang::prelude::*;
2995    /// # use futures::StreamExt;
2996    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2997    /// let tick = process.tick();
2998    /// # // ticks are lazy by default, forces the second tick to run
2999    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3000    /// # let batch_first_tick = process
3001    /// #   .source_iter(q!(vec![1, 2, 3, 4]))
3002    /// #  .batch(&tick, nondet!(/** test */));
3003    /// # let batch_second_tick = process
3004    /// #   .source_iter(q!(vec![5, 6, 7]))
3005    /// #   .batch(&tick, nondet!(/** test */))
3006    /// #   .defer_tick(); // appears on the second tick
3007    /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
3008    /// # batch_first_tick.chain(batch_second_tick).all_ticks();
3009    ///
3010    /// input.batch(&tick, nondet!(/** test */))
3011    ///     .across_ticks(|s| s.count()).all_ticks()
3012    /// # }, |mut stream| async move {
3013    /// // [4, 7]
3014    /// assert_eq!(stream.next().await.unwrap(), 4);
3015    /// assert_eq!(stream.next().await.unwrap(), 7);
3016    /// # }));
3017    /// # }
3018    /// ```
3019    pub fn across_ticks<Out: BatchAtomic>(
3020        self,
3021        thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
3022    ) -> Out::Batched {
3023        thunk(self.all_ticks_atomic()).batched_atomic()
3024    }
3025
3026    /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
3027    /// always has the elements of `self` at tick `T - 1`.
3028    ///
3029    /// At tick `0`, the output stream is empty, since there is no previous tick.
3030    ///
3031    /// This operator enables stateful iterative processing with ticks, by sending data from one
3032    /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
3033    ///
3034    /// # Example
3035    /// ```rust
3036    /// # #[cfg(feature = "deploy")] {
3037    /// # use hydro_lang::prelude::*;
3038    /// # use futures::StreamExt;
3039    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3040    /// let tick = process.tick();
3041    /// // ticks are lazy by default, forces the second tick to run
3042    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3043    ///
3044    /// let batch_first_tick = process
3045    ///   .source_iter(q!(vec![1, 2, 3, 4]))
3046    ///   .batch(&tick, nondet!(/** test */));
3047    /// let batch_second_tick = process
3048    ///   .source_iter(q!(vec![0, 3, 4, 5, 6]))
3049    ///   .batch(&tick, nondet!(/** test */))
3050    ///   .defer_tick(); // appears on the second tick
3051    /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
3052    ///
3053    /// changes_across_ticks.clone().filter_not_in(
3054    ///     changes_across_ticks.defer_tick() // the elements from the previous tick
3055    /// ).all_ticks()
3056    /// # }, |mut stream| async move {
3057    /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
3058    /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
3059    /// #     assert_eq!(stream.next().await.unwrap(), w);
3060    /// # }
3061    /// # }));
3062    /// # }
3063    /// ```
3064    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
3065        Stream::new(
3066            self.location.clone(),
3067            HydroNode::DeferTick {
3068                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3069                metadata: self
3070                    .location
3071                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3072            },
3073        )
3074    }
3075}
3076
3077#[cfg(test)]
3078mod tests {
3079    #[cfg(feature = "deploy")]
3080    use futures::{SinkExt, StreamExt};
3081    #[cfg(feature = "deploy")]
3082    use hydro_deploy::Deployment;
3083    #[cfg(feature = "deploy")]
3084    use serde::{Deserialize, Serialize};
3085    #[cfg(any(feature = "deploy", feature = "sim"))]
3086    use stageleft::q;
3087
3088    #[cfg(any(feature = "deploy", feature = "sim"))]
3089    use crate::compile::builder::FlowBuilder;
3090    #[cfg(feature = "deploy")]
3091    use crate::live_collections::sliced::sliced;
3092    #[cfg(feature = "deploy")]
3093    use crate::live_collections::stream::ExactlyOnce;
3094    #[cfg(feature = "sim")]
3095    use crate::live_collections::stream::NoOrder;
3096    #[cfg(any(feature = "deploy", feature = "sim"))]
3097    use crate::live_collections::stream::TotalOrder;
3098    #[cfg(any(feature = "deploy", feature = "sim"))]
3099    use crate::location::Location;
3100    #[cfg(feature = "sim")]
3101    use crate::networking::TCP;
3102    #[cfg(any(feature = "deploy", feature = "sim"))]
3103    use crate::nondet::nondet;
3104
3105    mod backtrace_chained_ops;
3106
3107    #[cfg(feature = "deploy")]
3108    struct P1 {}
3109    #[cfg(feature = "deploy")]
3110    struct P2 {}
3111
3112    #[cfg(feature = "deploy")]
3113    #[derive(Serialize, Deserialize, Debug)]
3114    struct SendOverNetwork {
3115        n: u32,
3116    }
3117
3118    #[cfg(feature = "deploy")]
3119    #[tokio::test]
3120    async fn first_ten_distributed() {
3121        use crate::networking::TCP;
3122
3123        let mut deployment = Deployment::new();
3124
3125        let mut flow = FlowBuilder::new();
3126        let first_node = flow.process::<P1>();
3127        let second_node = flow.process::<P2>();
3128        let external = flow.external::<P2>();
3129
3130        let numbers = first_node.source_iter(q!(0..10));
3131        let out_port = numbers
3132            .map(q!(|n| SendOverNetwork { n }))
3133            .send(&second_node, TCP.fail_stop().bincode())
3134            .send_bincode_external(&external);
3135
3136        let nodes = flow
3137            .with_process(&first_node, deployment.Localhost())
3138            .with_process(&second_node, deployment.Localhost())
3139            .with_external(&external, deployment.Localhost())
3140            .deploy(&mut deployment);
3141
3142        deployment.deploy().await.unwrap();
3143
3144        let mut external_out = nodes.connect(out_port).await;
3145
3146        deployment.start().await.unwrap();
3147
3148        for i in 0..10 {
3149            assert_eq!(external_out.next().await.unwrap().n, i);
3150        }
3151    }
3152
3153    #[cfg(feature = "deploy")]
3154    #[tokio::test]
3155    async fn first_cardinality() {
3156        let mut deployment = Deployment::new();
3157
3158        let mut flow = FlowBuilder::new();
3159        let node = flow.process::<()>();
3160        let external = flow.external::<()>();
3161
3162        let node_tick = node.tick();
3163        let count = node_tick
3164            .singleton(q!([1, 2, 3]))
3165            .into_stream()
3166            .flatten_ordered()
3167            .first()
3168            .into_stream()
3169            .count()
3170            .all_ticks()
3171            .send_bincode_external(&external);
3172
3173        let nodes = flow
3174            .with_process(&node, deployment.Localhost())
3175            .with_external(&external, deployment.Localhost())
3176            .deploy(&mut deployment);
3177
3178        deployment.deploy().await.unwrap();
3179
3180        let mut external_out = nodes.connect(count).await;
3181
3182        deployment.start().await.unwrap();
3183
3184        assert_eq!(external_out.next().await.unwrap(), 1);
3185    }
3186
3187    #[cfg(feature = "deploy")]
3188    #[tokio::test]
3189    async fn unbounded_reduce_remembers_state() {
3190        let mut deployment = Deployment::new();
3191
3192        let mut flow = FlowBuilder::new();
3193        let node = flow.process::<()>();
3194        let external = flow.external::<()>();
3195
3196        let (input_port, input) = node.source_external_bincode(&external);
3197        let out = input
3198            .reduce(q!(|acc, v| *acc += v))
3199            .sample_eager(nondet!(/** test */))
3200            .send_bincode_external(&external);
3201
3202        let nodes = flow
3203            .with_process(&node, deployment.Localhost())
3204            .with_external(&external, deployment.Localhost())
3205            .deploy(&mut deployment);
3206
3207        deployment.deploy().await.unwrap();
3208
3209        let mut external_in = nodes.connect(input_port).await;
3210        let mut external_out = nodes.connect(out).await;
3211
3212        deployment.start().await.unwrap();
3213
3214        external_in.send(1).await.unwrap();
3215        assert_eq!(external_out.next().await.unwrap(), 1);
3216
3217        external_in.send(2).await.unwrap();
3218        assert_eq!(external_out.next().await.unwrap(), 3);
3219    }
3220
3221    #[cfg(feature = "deploy")]
3222    #[tokio::test]
3223    async fn top_level_bounded_cross_singleton() {
3224        let mut deployment = Deployment::new();
3225
3226        let mut flow = FlowBuilder::new();
3227        let node = flow.process::<()>();
3228        let external = flow.external::<()>();
3229
3230        let (input_port, input) =
3231            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3232
3233        let out = input
3234            .cross_singleton(
3235                node.source_iter(q!(vec![1, 2, 3]))
3236                    .fold(q!(|| 0), q!(|acc, v| *acc += v)),
3237            )
3238            .send_bincode_external(&external);
3239
3240        let nodes = flow
3241            .with_process(&node, deployment.Localhost())
3242            .with_external(&external, deployment.Localhost())
3243            .deploy(&mut deployment);
3244
3245        deployment.deploy().await.unwrap();
3246
3247        let mut external_in = nodes.connect(input_port).await;
3248        let mut external_out = nodes.connect(out).await;
3249
3250        deployment.start().await.unwrap();
3251
3252        external_in.send(1).await.unwrap();
3253        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3254
3255        external_in.send(2).await.unwrap();
3256        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3257    }
3258
3259    #[cfg(feature = "deploy")]
3260    #[tokio::test]
3261    async fn top_level_bounded_reduce_cardinality() {
3262        let mut deployment = Deployment::new();
3263
3264        let mut flow = FlowBuilder::new();
3265        let node = flow.process::<()>();
3266        let external = flow.external::<()>();
3267
3268        let (input_port, input) =
3269            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3270
3271        let out = sliced! {
3272            let input = use(input, nondet!(/** test */));
3273            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
3274            input.cross_singleton(v.into_stream().count())
3275        }
3276        .send_bincode_external(&external);
3277
3278        let nodes = flow
3279            .with_process(&node, deployment.Localhost())
3280            .with_external(&external, deployment.Localhost())
3281            .deploy(&mut deployment);
3282
3283        deployment.deploy().await.unwrap();
3284
3285        let mut external_in = nodes.connect(input_port).await;
3286        let mut external_out = nodes.connect(out).await;
3287
3288        deployment.start().await.unwrap();
3289
3290        external_in.send(1).await.unwrap();
3291        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3292
3293        external_in.send(2).await.unwrap();
3294        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3295    }
3296
3297    #[cfg(feature = "deploy")]
3298    #[tokio::test]
3299    async fn top_level_bounded_into_singleton_cardinality() {
3300        let mut deployment = Deployment::new();
3301
3302        let mut flow = FlowBuilder::new();
3303        let node = flow.process::<()>();
3304        let external = flow.external::<()>();
3305
3306        let (input_port, input) =
3307            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3308
3309        let out = sliced! {
3310            let input = use(input, nondet!(/** test */));
3311            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
3312            input.cross_singleton(v.into_stream().count())
3313        }
3314        .send_bincode_external(&external);
3315
3316        let nodes = flow
3317            .with_process(&node, deployment.Localhost())
3318            .with_external(&external, deployment.Localhost())
3319            .deploy(&mut deployment);
3320
3321        deployment.deploy().await.unwrap();
3322
3323        let mut external_in = nodes.connect(input_port).await;
3324        let mut external_out = nodes.connect(out).await;
3325
3326        deployment.start().await.unwrap();
3327
3328        external_in.send(1).await.unwrap();
3329        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3330
3331        external_in.send(2).await.unwrap();
3332        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3333    }
3334
3335    #[cfg(feature = "deploy")]
3336    #[tokio::test]
3337    async fn atomic_fold_replays_each_tick() {
3338        let mut deployment = Deployment::new();
3339
3340        let mut flow = FlowBuilder::new();
3341        let node = flow.process::<()>();
3342        let external = flow.external::<()>();
3343
3344        let (input_port, input) =
3345            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3346        let tick = node.tick();
3347
3348        let out = input
3349            .batch(&tick, nondet!(/** test */))
3350            .cross_singleton(
3351                node.source_iter(q!(vec![1, 2, 3]))
3352                    .atomic()
3353                    .fold(q!(|| 0), q!(|acc, v| *acc += v))
3354                    .snapshot_atomic(&tick, nondet!(/** test */)),
3355            )
3356            .all_ticks()
3357            .send_bincode_external(&external);
3358
3359        let nodes = flow
3360            .with_process(&node, deployment.Localhost())
3361            .with_external(&external, deployment.Localhost())
3362            .deploy(&mut deployment);
3363
3364        deployment.deploy().await.unwrap();
3365
3366        let mut external_in = nodes.connect(input_port).await;
3367        let mut external_out = nodes.connect(out).await;
3368
3369        deployment.start().await.unwrap();
3370
3371        external_in.send(1).await.unwrap();
3372        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3373
3374        external_in.send(2).await.unwrap();
3375        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3376    }
3377
3378    #[cfg(feature = "deploy")]
3379    #[tokio::test]
3380    async fn unbounded_scan_remembers_state() {
3381        let mut deployment = Deployment::new();
3382
3383        let mut flow = FlowBuilder::new();
3384        let node = flow.process::<()>();
3385        let external = flow.external::<()>();
3386
3387        let (input_port, input) = node.source_external_bincode(&external);
3388        let out = input
3389            .scan(
3390                q!(|| 0),
3391                q!(|acc, v| {
3392                    *acc += v;
3393                    Some(*acc)
3394                }),
3395            )
3396            .send_bincode_external(&external);
3397
3398        let nodes = flow
3399            .with_process(&node, deployment.Localhost())
3400            .with_external(&external, deployment.Localhost())
3401            .deploy(&mut deployment);
3402
3403        deployment.deploy().await.unwrap();
3404
3405        let mut external_in = nodes.connect(input_port).await;
3406        let mut external_out = nodes.connect(out).await;
3407
3408        deployment.start().await.unwrap();
3409
3410        external_in.send(1).await.unwrap();
3411        assert_eq!(external_out.next().await.unwrap(), 1);
3412
3413        external_in.send(2).await.unwrap();
3414        assert_eq!(external_out.next().await.unwrap(), 3);
3415    }
3416
3417    #[cfg(feature = "deploy")]
3418    #[tokio::test]
3419    async fn unbounded_enumerate_remembers_state() {
3420        let mut deployment = Deployment::new();
3421
3422        let mut flow = FlowBuilder::new();
3423        let node = flow.process::<()>();
3424        let external = flow.external::<()>();
3425
3426        let (input_port, input) = node.source_external_bincode(&external);
3427        let out = input.enumerate().send_bincode_external(&external);
3428
3429        let nodes = flow
3430            .with_process(&node, deployment.Localhost())
3431            .with_external(&external, deployment.Localhost())
3432            .deploy(&mut deployment);
3433
3434        deployment.deploy().await.unwrap();
3435
3436        let mut external_in = nodes.connect(input_port).await;
3437        let mut external_out = nodes.connect(out).await;
3438
3439        deployment.start().await.unwrap();
3440
3441        external_in.send(1).await.unwrap();
3442        assert_eq!(external_out.next().await.unwrap(), (0, 1));
3443
3444        external_in.send(2).await.unwrap();
3445        assert_eq!(external_out.next().await.unwrap(), (1, 2));
3446    }
3447
3448    #[cfg(feature = "deploy")]
3449    #[tokio::test]
3450    async fn unbounded_unique_remembers_state() {
3451        let mut deployment = Deployment::new();
3452
3453        let mut flow = FlowBuilder::new();
3454        let node = flow.process::<()>();
3455        let external = flow.external::<()>();
3456
3457        let (input_port, input) =
3458            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3459        let out = input.unique().send_bincode_external(&external);
3460
3461        let nodes = flow
3462            .with_process(&node, deployment.Localhost())
3463            .with_external(&external, deployment.Localhost())
3464            .deploy(&mut deployment);
3465
3466        deployment.deploy().await.unwrap();
3467
3468        let mut external_in = nodes.connect(input_port).await;
3469        let mut external_out = nodes.connect(out).await;
3470
3471        deployment.start().await.unwrap();
3472
3473        external_in.send(1).await.unwrap();
3474        assert_eq!(external_out.next().await.unwrap(), 1);
3475
3476        external_in.send(2).await.unwrap();
3477        assert_eq!(external_out.next().await.unwrap(), 2);
3478
3479        external_in.send(1).await.unwrap();
3480        external_in.send(3).await.unwrap();
3481        assert_eq!(external_out.next().await.unwrap(), 3);
3482    }
3483
3484    #[cfg(feature = "sim")]
3485    #[test]
3486    #[should_panic]
3487    fn sim_batch_nondet_size() {
3488        let mut flow = FlowBuilder::new();
3489        let node = flow.process::<()>();
3490
3491        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3492
3493        let tick = node.tick();
3494        let out_recv = input
3495            .batch(&tick, nondet!(/** test */))
3496            .count()
3497            .all_ticks()
3498            .sim_output();
3499
3500        flow.sim().exhaustive(async || {
3501            in_send.send(());
3502            in_send.send(());
3503            in_send.send(());
3504
3505            assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3506        });
3507    }
3508
3509    #[cfg(feature = "sim")]
3510    #[test]
3511    fn sim_batch_preserves_order() {
3512        let mut flow = FlowBuilder::new();
3513        let node = flow.process::<()>();
3514
3515        let (in_send, input) = node.sim_input();
3516
3517        let tick = node.tick();
3518        let out_recv = input
3519            .batch(&tick, nondet!(/** test */))
3520            .all_ticks()
3521            .sim_output();
3522
3523        flow.sim().exhaustive(async || {
3524            in_send.send(1);
3525            in_send.send(2);
3526            in_send.send(3);
3527
3528            out_recv.assert_yields_only([1, 2, 3]).await;
3529        });
3530    }
3531
3532    #[cfg(feature = "sim")]
3533    #[test]
3534    #[should_panic]
3535    fn sim_batch_unordered_shuffles() {
3536        let mut flow = FlowBuilder::new();
3537        let node = flow.process::<()>();
3538
3539        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3540
3541        let tick = node.tick();
3542        let batch = input.batch(&tick, nondet!(/** test */));
3543        let out_recv = batch
3544            .clone()
3545            .min()
3546            .zip(batch.max())
3547            .all_ticks()
3548            .sim_output();
3549
3550        flow.sim().exhaustive(async || {
3551            in_send.send_many_unordered([1, 2, 3]);
3552
3553            if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3554                panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3555            }
3556        });
3557    }
3558
3559    #[cfg(feature = "sim")]
3560    #[test]
3561    fn sim_batch_unordered_shuffles_count() {
3562        let mut flow = FlowBuilder::new();
3563        let node = flow.process::<()>();
3564
3565        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3566
3567        let tick = node.tick();
3568        let batch = input.batch(&tick, nondet!(/** test */));
3569        let out_recv = batch.all_ticks().sim_output();
3570
3571        let instance_count = flow.sim().exhaustive(async || {
3572            in_send.send_many_unordered([1, 2, 3, 4]);
3573            out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3574        });
3575
3576        assert_eq!(
3577            instance_count,
3578            75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3579        )
3580    }
3581
3582    #[cfg(feature = "sim")]
3583    #[test]
3584    #[should_panic]
3585    fn sim_observe_order_batched() {
3586        let mut flow = FlowBuilder::new();
3587        let node = flow.process::<()>();
3588
3589        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3590
3591        let tick = node.tick();
3592        let batch = input.batch(&tick, nondet!(/** test */));
3593        let out_recv = batch
3594            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3595            .all_ticks()
3596            .sim_output();
3597
3598        flow.sim().exhaustive(async || {
3599            in_send.send_many_unordered([1, 2, 3, 4]);
3600            out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3601        });
3602    }
3603
3604    #[cfg(feature = "sim")]
3605    #[test]
3606    fn sim_observe_order_batched_count() {
3607        let mut flow = FlowBuilder::new();
3608        let node = flow.process::<()>();
3609
3610        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3611
3612        let tick = node.tick();
3613        let batch = input.batch(&tick, nondet!(/** test */));
3614        let out_recv = batch
3615            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3616            .all_ticks()
3617            .sim_output();
3618
3619        let instance_count = flow.sim().exhaustive(async || {
3620            in_send.send_many_unordered([1, 2, 3, 4]);
3621            let _ = out_recv.collect::<Vec<_>>().await;
3622        });
3623
3624        assert_eq!(
3625            instance_count,
3626            192 // 4! * 2^{4 - 1}
3627        )
3628    }
3629
3630    #[cfg(feature = "sim")]
3631    #[test]
3632    fn sim_unordered_count_instance_count() {
3633        let mut flow = FlowBuilder::new();
3634        let node = flow.process::<()>();
3635
3636        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3637
3638        let tick = node.tick();
3639        let out_recv = input
3640            .count()
3641            .snapshot(&tick, nondet!(/** test */))
3642            .all_ticks()
3643            .sim_output();
3644
3645        let instance_count = flow.sim().exhaustive(async || {
3646            in_send.send_many_unordered([1, 2, 3, 4]);
3647            assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3648        });
3649
3650        assert_eq!(
3651            instance_count,
3652            16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3653        )
3654    }
3655
3656    #[cfg(feature = "sim")]
3657    #[test]
3658    fn sim_top_level_assume_ordering() {
3659        let mut flow = FlowBuilder::new();
3660        let node = flow.process::<()>();
3661
3662        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3663
3664        let out_recv = input
3665            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3666            .sim_output();
3667
3668        let instance_count = flow.sim().exhaustive(async || {
3669            in_send.send_many_unordered([1, 2, 3]);
3670            let mut out = out_recv.collect::<Vec<_>>().await;
3671            out.sort();
3672            assert_eq!(out, vec![1, 2, 3]);
3673        });
3674
3675        assert_eq!(instance_count, 6)
3676    }
3677
3678    #[cfg(feature = "sim")]
3679    #[test]
3680    fn sim_top_level_assume_ordering_cycle_back() {
3681        let mut flow = FlowBuilder::new();
3682        let node = flow.process::<()>();
3683        let node2 = flow.process::<()>();
3684
3685        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3686
3687        let (complete_cycle_back, cycle_back) =
3688            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3689        let ordered = input
3690            .merge_unordered(cycle_back)
3691            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3692        complete_cycle_back.complete(
3693            ordered
3694                .clone()
3695                .map(q!(|v| v + 1))
3696                .filter(q!(|v| v % 2 == 1))
3697                .send(&node2, TCP.fail_stop().bincode())
3698                .send(&node, TCP.fail_stop().bincode()),
3699        );
3700
3701        let out_recv = ordered.sim_output();
3702
3703        let mut saw = false;
3704        let instance_count = flow.sim().exhaustive(async || {
3705            in_send.send_many_unordered([0, 2]);
3706            let out = out_recv.collect::<Vec<_>>().await;
3707
3708            if out.starts_with(&[0, 1, 2]) {
3709                saw = true;
3710            }
3711        });
3712
3713        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3714        assert_eq!(instance_count, 6);
3715    }
3716
3717    #[cfg(feature = "sim")]
3718    #[test]
3719    fn sim_top_level_assume_ordering_cycle_back_tick() {
3720        let mut flow = FlowBuilder::new();
3721        let node = flow.process::<()>();
3722        let node2 = flow.process::<()>();
3723
3724        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3725
3726        let (complete_cycle_back, cycle_back) =
3727            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3728        let ordered = input
3729            .merge_unordered(cycle_back)
3730            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3731        complete_cycle_back.complete(
3732            ordered
3733                .clone()
3734                .batch(&node.tick(), nondet!(/** test */))
3735                .all_ticks()
3736                .map(q!(|v| v + 1))
3737                .filter(q!(|v| v % 2 == 1))
3738                .send(&node2, TCP.fail_stop().bincode())
3739                .send(&node, TCP.fail_stop().bincode()),
3740        );
3741
3742        let out_recv = ordered.sim_output();
3743
3744        let mut saw = false;
3745        let instance_count = flow.sim().exhaustive(async || {
3746            in_send.send_many_unordered([0, 2]);
3747            let out = out_recv.collect::<Vec<_>>().await;
3748
3749            if out.starts_with(&[0, 1, 2]) {
3750                saw = true;
3751            }
3752        });
3753
3754        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3755        assert_eq!(instance_count, 58);
3756    }
3757
3758    #[cfg(feature = "sim")]
3759    #[test]
3760    fn sim_top_level_assume_ordering_multiple() {
3761        let mut flow = FlowBuilder::new();
3762        let node = flow.process::<()>();
3763        let node2 = flow.process::<()>();
3764
3765        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3766        let (_, input2) = node.sim_input::<_, NoOrder, _>();
3767
3768        let (complete_cycle_back, cycle_back) =
3769            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3770        let input1_ordered = input
3771            .clone()
3772            .merge_unordered(cycle_back)
3773            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3774        let foo = input1_ordered
3775            .clone()
3776            .map(q!(|v| v + 3))
3777            .weaken_ordering::<NoOrder>()
3778            .merge_unordered(input2)
3779            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3780
3781        complete_cycle_back.complete(
3782            foo.filter(q!(|v| *v == 3))
3783                .send(&node2, TCP.fail_stop().bincode())
3784                .send(&node, TCP.fail_stop().bincode()),
3785        );
3786
3787        let out_recv = input1_ordered.sim_output();
3788
3789        let mut saw = false;
3790        let instance_count = flow.sim().exhaustive(async || {
3791            in_send.send_many_unordered([0, 1]);
3792            let out = out_recv.collect::<Vec<_>>().await;
3793
3794            if out.starts_with(&[0, 3, 1]) {
3795                saw = true;
3796            }
3797        });
3798
3799        assert!(saw, "did not see an instance with 0, 3, 1 in order");
3800        assert_eq!(instance_count, 24);
3801    }
3802
3803    #[cfg(feature = "sim")]
3804    #[test]
3805    fn sim_atomic_assume_ordering_cycle_back() {
3806        let mut flow = FlowBuilder::new();
3807        let node = flow.process::<()>();
3808        let node2 = flow.process::<()>();
3809
3810        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3811
3812        let (complete_cycle_back, cycle_back) =
3813            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3814        let ordered = input
3815            .merge_unordered(cycle_back)
3816            .atomic()
3817            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3818            .end_atomic();
3819        complete_cycle_back.complete(
3820            ordered
3821                .clone()
3822                .map(q!(|v| v + 1))
3823                .filter(q!(|v| v % 2 == 1))
3824                .send(&node2, TCP.fail_stop().bincode())
3825                .send(&node, TCP.fail_stop().bincode()),
3826        );
3827
3828        let out_recv = ordered.sim_output();
3829
3830        let instance_count = flow.sim().exhaustive(async || {
3831            in_send.send_many_unordered([0, 2]);
3832            let out = out_recv.collect::<Vec<_>>().await;
3833            assert_eq!(out.len(), 4);
3834        });
3835        assert_eq!(instance_count, 22);
3836    }
3837
3838    #[cfg(feature = "deploy")]
3839    #[tokio::test]
3840    async fn partition_evens_odds() {
3841        let mut deployment = Deployment::new();
3842
3843        let mut flow = FlowBuilder::new();
3844        let node = flow.process::<()>();
3845        let external = flow.external::<()>();
3846
3847        let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
3848        let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
3849        let evens_port = evens.send_bincode_external(&external);
3850        let odds_port = odds.send_bincode_external(&external);
3851
3852        let nodes = flow
3853            .with_process(&node, deployment.Localhost())
3854            .with_external(&external, deployment.Localhost())
3855            .deploy(&mut deployment);
3856
3857        deployment.deploy().await.unwrap();
3858
3859        let mut evens_out = nodes.connect(evens_port).await;
3860        let mut odds_out = nodes.connect(odds_port).await;
3861
3862        deployment.start().await.unwrap();
3863
3864        let mut even_results = Vec::new();
3865        for _ in 0..3 {
3866            even_results.push(evens_out.next().await.unwrap());
3867        }
3868        even_results.sort();
3869        assert_eq!(even_results, vec![2, 4, 6]);
3870
3871        let mut odd_results = Vec::new();
3872        for _ in 0..3 {
3873            odd_results.push(odds_out.next().await.unwrap());
3874        }
3875        odd_results.sort();
3876        assert_eq!(odd_results, vec![1, 3, 5]);
3877    }
3878
3879    #[cfg(feature = "deploy")]
3880    #[tokio::test]
3881    async fn unconsumed_inspect_still_runs() {
3882        use crate::deploy::DeployCrateWrapper;
3883
3884        let mut deployment = Deployment::new();
3885
3886        let mut flow = FlowBuilder::new();
3887        let node = flow.process::<()>();
3888
3889        // The return value of .inspect() is intentionally dropped.
3890        // Before the Null-root fix, this would silently do nothing.
3891        node.source_iter(q!(0..5))
3892            .inspect(q!(|x| println!("inspect: {}", x)));
3893
3894        let nodes = flow
3895            .with_process(&node, deployment.Localhost())
3896            .deploy(&mut deployment);
3897
3898        deployment.deploy().await.unwrap();
3899
3900        let mut stdout = nodes.get_process(&node).stdout();
3901
3902        deployment.start().await.unwrap();
3903
3904        let mut lines = Vec::new();
3905        for _ in 0..5 {
3906            lines.push(stdout.recv().await.unwrap());
3907        }
3908        lines.sort();
3909        assert_eq!(
3910            lines,
3911            vec![
3912                "inspect: 0",
3913                "inspect: 1",
3914                "inspect: 2",
3915                "inspect: 3",
3916                "inspect: 4",
3917            ]
3918        );
3919    }
3920
3921    #[cfg(feature = "sim")]
3922    #[test]
3923    fn sim_limit() {
3924        let mut flow = FlowBuilder::new();
3925        let node = flow.process::<()>();
3926
3927        let (in_send, input) = node.sim_input();
3928
3929        let out_recv = input.limit(q!(3)).sim_output();
3930
3931        flow.sim().exhaustive(async || {
3932            in_send.send(1);
3933            in_send.send(2);
3934            in_send.send(3);
3935            in_send.send(4);
3936            in_send.send(5);
3937
3938            out_recv.assert_yields_only([1, 2, 3]).await;
3939        });
3940    }
3941
3942    #[cfg(feature = "sim")]
3943    #[test]
3944    fn sim_limit_zero() {
3945        let mut flow = FlowBuilder::new();
3946        let node = flow.process::<()>();
3947
3948        let (in_send, input) = node.sim_input();
3949
3950        let out_recv = input.limit(q!(0)).sim_output();
3951
3952        flow.sim().exhaustive(async || {
3953            in_send.send(1);
3954            in_send.send(2);
3955
3956            out_recv.assert_yields_only::<i32, _>([]).await;
3957        });
3958    }
3959
3960    #[cfg(feature = "sim")]
3961    #[test]
3962    fn sim_merge_ordered() {
3963        let mut flow = FlowBuilder::new();
3964        let node = flow.process::<()>();
3965
3966        let (in_send, input) = node.sim_input();
3967        let (in_send2, input2) = node.sim_input();
3968
3969        let out_recv = input
3970            .merge_ordered(input2, nondet!(/** test */))
3971            .sim_output();
3972
3973        let mut saw_out_of_order = false;
3974        let instances = flow.sim().exhaustive(async || {
3975            in_send.send(1);
3976            in_send.send(2);
3977            in_send2.send(3);
3978            in_send2.send(4);
3979
3980            let out = out_recv.collect::<Vec<_>>().await;
3981
3982            if out == [1, 3, 2, 4] {
3983                saw_out_of_order = true;
3984            }
3985
3986            // Assert ordering preservation: elements from each input must
3987            // appear in their original relative order.
3988            let mut first_elements = out.iter().filter(|v| **v <= 2).copied().collect::<Vec<_>>();
3989            let mut second_elements = out.iter().filter(|v| **v > 2).copied().collect::<Vec<_>>();
3990            assert_eq!(
3991                first_elements,
3992                vec![1, 2],
3993                "first input order violated: {:?}",
3994                out
3995            );
3996            assert_eq!(
3997                second_elements,
3998                vec![3, 4],
3999                "second input order violated: {:?}",
4000                out
4001            );
4002
4003            first_elements.append(&mut second_elements);
4004            first_elements.sort();
4005            assert_eq!(first_elements, vec![1, 2, 3, 4]);
4006        });
4007
4008        assert!(saw_out_of_order);
4009        assert_eq!(instances, 6);
4010    }
4011
4012    /// Tests that merge_ordered passes through elements when only one input
4013    /// has data.
4014    #[cfg(feature = "sim")]
4015    #[test]
4016    fn sim_merge_ordered_one_empty() {
4017        let mut flow = FlowBuilder::new();
4018        let node = flow.process::<()>();
4019
4020        let (in_send, input) = node.sim_input();
4021        let (_in_send2, input2) = node.sim_input();
4022
4023        let out_recv = input
4024            .merge_ordered(input2, nondet!(/** test */))
4025            .sim_output();
4026
4027        let instances = flow.sim().exhaustive(async || {
4028            in_send.send(1);
4029            in_send.send(2);
4030
4031            let out = out_recv.collect::<Vec<_>>().await;
4032            assert_eq!(out, vec![1, 2]);
4033        });
4034
4035        // Only one possible interleaving when one input is empty
4036        assert_eq!(instances, 1);
4037    }
4038
4039    /// Tests that merge_ordered correctly handles feedback cycles.
4040    /// An element output from merge_ordered is filtered and cycled back to
4041    /// one of its inputs. The one-at-a-time release must allow the cycled-back
4042    /// element to arrive and potentially be emitted before elements still
4043    /// waiting on the other input.
4044    #[cfg(feature = "sim")]
4045    #[test]
4046    fn sim_merge_ordered_cycle_back() {
4047        let mut flow = FlowBuilder::new();
4048        let node = flow.process::<()>();
4049
4050        let (in_send, input) = node.sim_input();
4051
4052        // Create a forward ref for the cycle back
4053        let (complete_cycle_back, cycle_back) =
4054            node.forward_ref::<super::Stream<_, _, _, TotalOrder>>();
4055
4056        // merge_ordered: input (external) with cycle_back
4057        let merged = input.merge_ordered(cycle_back, nondet!(/** test */));
4058
4059        // Cycle back: elements equal to 1 get mapped to 10 and fed back
4060        complete_cycle_back.complete(merged.clone().filter(q!(|v| *v == 1)).map(q!(|v| v * 10)));
4061
4062        let out_recv = merged.sim_output();
4063
4064        // Send 1 and 2. Element 1 should cycle back as 10.
4065        // Valid orderings must have 1 before 10 (since 10 depends on 1).
4066        let mut saw_cycle_before_second = false;
4067        flow.sim().exhaustive(async || {
4068            in_send.send(1);
4069            in_send.send(2);
4070
4071            let out = out_recv.collect::<Vec<_>>().await;
4072
4073            // 10 must always come after 1 (causal dependency)
4074            let pos_1 = out.iter().position(|v| *v == 1).unwrap();
4075            let pos_10 = out.iter().position(|v| *v == 10).unwrap();
4076            assert!(pos_1 < pos_10, "causal order violated: {:?}", out);
4077
4078            // Check if we see [1, 10, 2] — the cycled element beats the second input
4079            if out == [1, 10, 2] {
4080                saw_cycle_before_second = true;
4081            }
4082
4083            let mut sorted = out;
4084            sorted.sort();
4085            assert_eq!(sorted, vec![1, 2, 10]);
4086        });
4087
4088        assert!(
4089            saw_cycle_before_second,
4090            "never saw the cycled element arrive before the second input element"
4091        );
4092    }
4093
4094    /// Tests that merge_ordered correctly interleaves when one input has a
4095    /// delayed element. With a: [1, _delay_, 2] and b: [3, 4], the delayed
4096    /// element 2 should be able to appear after b's elements.
4097    #[cfg(feature = "sim")]
4098    #[test]
4099    fn sim_merge_ordered_delayed() {
4100        let mut flow = FlowBuilder::new();
4101        let node = flow.process::<()>();
4102
4103        let (in_send, input) = node.sim_input();
4104        let (in_send2, input2) = node.sim_input();
4105
4106        let out_recv = input
4107            .merge_ordered(input2, nondet!(/** test */))
4108            .sim_output();
4109
4110        let mut saw_delayed_interleaving = false;
4111        flow.sim().exhaustive(async || {
4112            // Send 1 from a, and 3, 4 from b
4113            in_send.send(1);
4114            in_send2.send(3);
4115            in_send2.send(4);
4116
4117            // Collect what's available so far
4118            let first_batch = out_recv.collect::<Vec<_>>().await;
4119
4120            // Now send the delayed element 2 from a
4121            in_send.send(2);
4122            let second_batch = out_recv.collect::<Vec<_>>().await;
4123
4124            let mut all: Vec<_> = first_batch
4125                .iter()
4126                .chain(second_batch.iter())
4127                .copied()
4128                .collect();
4129
4130            // Check if we saw [1, 3, 4, 2] — the delayed interleaving
4131            if all == [1, 3, 4, 2] {
4132                saw_delayed_interleaving = true;
4133            }
4134
4135            all.sort();
4136            assert_eq!(all, vec![1, 2, 3, 4]);
4137        });
4138
4139        assert!(saw_delayed_interleaving);
4140    }
4141
4142    /// Deploy test: merge_ordered with a delayed element on one input.
4143    /// Sends a=1, b=3, b=4, then after receiving those, sends a=2.
4144    /// Expects to see [1, 3, 4] first, then [2] — demonstrating that
4145    /// both inputs are pulled and the delayed element arrives later.
4146    #[cfg(feature = "deploy")]
4147    #[tokio::test]
4148    async fn deploy_merge_ordered_delayed() {
4149        let mut deployment = Deployment::new();
4150
4151        let mut flow = FlowBuilder::new();
4152        let node = flow.process::<()>();
4153        let external = flow.external::<()>();
4154
4155        let (input_a_port, input_a) = node.source_external_bincode(&external);
4156        let (input_b_port, input_b) = node.source_external_bincode(&external);
4157
4158        let out = input_a
4159            .assume_ordering(nondet!(/** test */))
4160            .merge_ordered(
4161                input_b.assume_ordering(nondet!(/** test */)),
4162                nondet!(/** test */),
4163            )
4164            .send_bincode_external(&external);
4165
4166        let nodes = flow
4167            .with_process(&node, deployment.Localhost())
4168            .with_external(&external, deployment.Localhost())
4169            .deploy(&mut deployment);
4170
4171        deployment.deploy().await.unwrap();
4172
4173        let mut ext_a = nodes.connect(input_a_port).await;
4174        let mut ext_b = nodes.connect(input_b_port).await;
4175        let mut ext_out = nodes.connect(out).await;
4176
4177        deployment.start().await.unwrap();
4178
4179        // Send a=1, b=3, b=4
4180        ext_a.send(1).await.unwrap();
4181        ext_b.send(3).await.unwrap();
4182        ext_b.send(4).await.unwrap();
4183
4184        // Collect the first 3 elements
4185        let mut received = Vec::new();
4186        for _ in 0..3 {
4187            received.push(ext_out.next().await.unwrap());
4188        }
4189
4190        // Now send the delayed a=2
4191        ext_a.send(2).await.unwrap();
4192        received.push(ext_out.next().await.unwrap());
4193
4194        // All elements should be present
4195        received.sort();
4196        assert_eq!(received, vec![1, 2, 3, 4]);
4197    }
4198
4199    #[cfg(feature = "deploy")]
4200    #[tokio::test]
4201    async fn monotone_fold_threshold() {
4202        use crate::properties::manual_proof;
4203
4204        let mut deployment = Deployment::new();
4205
4206        let mut flow = FlowBuilder::new();
4207        let node = flow.process::<()>();
4208        let external = flow.external::<()>();
4209
4210        let in_unbounded: super::Stream<_, _> =
4211            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4212        let sum = in_unbounded.fold(
4213            q!(|| 0),
4214            q!(
4215                |sum, v| {
4216                    *sum += v;
4217                },
4218                monotone = manual_proof!(/** test */)
4219            ),
4220        );
4221
4222        let threshold_out = sum
4223            .threshold_greater_or_equal(node.singleton(q!(7)))
4224            .send_bincode_external(&external);
4225
4226        let nodes = flow
4227            .with_process(&node, deployment.Localhost())
4228            .with_external(&external, deployment.Localhost())
4229            .deploy(&mut deployment);
4230
4231        deployment.deploy().await.unwrap();
4232
4233        let mut threshold_out = nodes.connect(threshold_out).await;
4234
4235        deployment.start().await.unwrap();
4236
4237        assert_eq!(threshold_out.next().await.unwrap(), 7);
4238    }
4239
4240    #[cfg(feature = "deploy")]
4241    #[tokio::test]
4242    async fn monotone_count_threshold() {
4243        let mut deployment = Deployment::new();
4244
4245        let mut flow = FlowBuilder::new();
4246        let node = flow.process::<()>();
4247        let external = flow.external::<()>();
4248
4249        let in_unbounded: super::Stream<_, _> =
4250            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4251        let sum = in_unbounded.count();
4252
4253        let threshold_out = sum
4254            .threshold_greater_or_equal(node.singleton(q!(3)))
4255            .send_bincode_external(&external);
4256
4257        let nodes = flow
4258            .with_process(&node, deployment.Localhost())
4259            .with_external(&external, deployment.Localhost())
4260            .deploy(&mut deployment);
4261
4262        deployment.deploy().await.unwrap();
4263
4264        let mut threshold_out = nodes.connect(threshold_out).await;
4265
4266        deployment.start().await.unwrap();
4267
4268        assert_eq!(threshold_out.next().await.unwrap(), 3);
4269    }
4270
4271    #[cfg(feature = "deploy")]
4272    #[tokio::test]
4273    async fn monotone_map_order_preserving_threshold() {
4274        use crate::properties::manual_proof;
4275
4276        let mut deployment = Deployment::new();
4277
4278        let mut flow = FlowBuilder::new();
4279        let node = flow.process::<()>();
4280        let external = flow.external::<()>();
4281
4282        let in_unbounded: super::Stream<_, _> =
4283            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4284        let sum = in_unbounded.fold(
4285            q!(|| 0),
4286            q!(
4287                |sum, v| {
4288                    *sum += v;
4289                },
4290                monotone = manual_proof!(/** test */)
4291            ),
4292        );
4293
4294        // map with order_preserving should preserve monotonicity
4295        let doubled = sum.map(q!(
4296            |v| v * 2,
4297            order_preserving = manual_proof!(/** doubling preserves order */)
4298        ));
4299
4300        let threshold_out = doubled
4301            .threshold_greater_or_equal(node.singleton(q!(14)))
4302            .send_bincode_external(&external);
4303
4304        let nodes = flow
4305            .with_process(&node, deployment.Localhost())
4306            .with_external(&external, deployment.Localhost())
4307            .deploy(&mut deployment);
4308
4309        deployment.deploy().await.unwrap();
4310
4311        let mut threshold_out = nodes.connect(threshold_out).await;
4312
4313        deployment.start().await.unwrap();
4314
4315        assert_eq!(threshold_out.next().await.unwrap(), 14);
4316    }
4317
4318    // === Compile-time type tests for join/cross_product ordering ===
4319
4320    #[cfg(any(feature = "deploy", feature = "sim"))]
4321    mod join_ordering_type_tests {
4322        use crate::live_collections::boundedness::{Bounded, Unbounded};
4323        use crate::live_collections::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
4324        use crate::location::{Location, Process};
4325
4326        #[expect(dead_code, reason = "compile-time type test")]
4327        fn join_unbounded_with_bounded_preserves_order<'a>(
4328            left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4329            right: Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4330        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4331            left.join(right)
4332        }
4333
4334        #[expect(dead_code, reason = "compile-time type test")]
4335        fn join_unbounded_with_unbounded_is_no_order<'a>(
4336            left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4337            right: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4338        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4339            left.join(right)
4340        }
4341
4342        #[expect(dead_code, reason = "compile-time type test")]
4343        fn join_bounded_with_bounded_preserves_order<'a, L: Location<'a>>(
4344            left: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4345            right: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4346        ) -> Stream<(i32, (char, char)), L, Bounded, TotalOrder, ExactlyOnce> {
4347            left.join(right)
4348        }
4349
4350        #[expect(dead_code, reason = "compile-time type test")]
4351        fn join_unbounded_noorder_with_bounded<'a>(
4352            left: Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce>,
4353            right: Stream<(i32, char), Process<'a>, Bounded, NoOrder, ExactlyOnce>,
4354        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4355            left.join(right)
4356        }
4357
4358        // === Compile-time type tests for cross_product ordering ===
4359
4360        #[expect(dead_code, reason = "compile-time type test")]
4361        fn cross_product_unbounded_with_bounded_preserves_order<'a>(
4362            left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4363            right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4364        ) -> Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4365            left.cross_product(right)
4366        }
4367
4368        #[expect(dead_code, reason = "compile-time type test")]
4369        fn cross_product_bounded_with_bounded_preserves_order<'a>(
4370            left: Stream<i32, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4371            right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4372        ) -> Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce> {
4373            left.cross_product(right)
4374        }
4375
4376        #[expect(dead_code, reason = "compile-time type test")]
4377        fn cross_product_unbounded_with_unbounded_is_no_order<'a>(
4378            left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4379            right: Stream<char, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4380        ) -> Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4381            left.cross_product(right)
4382        }
4383    } // mod join_ordering_type_tests
4384
4385    // === Runtime correctness tests for bounded join/cross_product ===
4386
4387    #[cfg(feature = "sim")]
4388    #[test]
4389    fn cross_product_mixed_boundedness_correctness() {
4390        use stageleft::q;
4391
4392        use crate::compile::builder::FlowBuilder;
4393        use crate::nondet::nondet;
4394
4395        let mut flow = FlowBuilder::new();
4396        let process = flow.process::<()>();
4397        let tick = process.tick();
4398
4399        let left = process.source_iter(q!(vec![1, 2]));
4400        let right = process
4401            .source_iter(q!(vec!['a', 'b']))
4402            .batch(&tick, nondet!(/** test */))
4403            .all_ticks();
4404
4405        let out = left.cross_product(right).sim_output();
4406
4407        flow.sim().exhaustive(async || {
4408            out.assert_yields_only_unordered(vec![(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')])
4409                .await;
4410        });
4411    }
4412
4413    #[cfg(feature = "sim")]
4414    #[test]
4415    fn join_mixed_boundedness_correctness() {
4416        use stageleft::q;
4417
4418        use crate::compile::builder::FlowBuilder;
4419        use crate::nondet::nondet;
4420
4421        let mut flow = FlowBuilder::new();
4422        let process = flow.process::<()>();
4423        let tick = process.tick();
4424
4425        let left = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
4426        let right = process
4427            .source_iter(q!(vec![(1, 'x'), (2, 'y')]))
4428            .batch(&tick, nondet!(/** test */))
4429            .all_ticks();
4430
4431        let out = left.join(right).sim_output();
4432
4433        flow.sim().exhaustive(async || {
4434            out.assert_yields_only_unordered(vec![(1, ('a', 'x')), (2, ('b', 'y'))])
4435                .await;
4436        });
4437    }
4438}