Skip to main content

hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::{Generate, KeyedStream};
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::singleton::SingletonBound;
27#[cfg(stageleft_runtime)]
28use crate::location::dynamic::{DynLocation, LocationId};
29use crate::location::tick::{Atomic, DeferTick};
30use crate::location::{Location, Tick, TopLevel, check_matching_location};
31use crate::manual_expr::ManualExpr;
32use crate::nondet::{NonDet, nondet};
33use crate::prelude::manual_proof;
34use crate::properties::{
35    AggFuncAlgebra, ApplyMonotoneStream, ValidCommutativityFor, ValidIdempotenceFor,
36};
37
38pub mod networking;
39
40/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
41#[sealed::sealed]
42pub trait Ordering:
43    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
44{
45    /// The [`StreamOrder`] corresponding to this type.
46    const ORDERING_KIND: StreamOrder;
47}
48
49/// Marks the stream as being totally ordered, which means that there are
50/// no sources of non-determinism (other than intentional ones) that will
51/// affect the order of elements.
52pub enum TotalOrder {}
53
54#[sealed::sealed]
55impl Ordering for TotalOrder {
56    const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
57}
58
59/// Marks the stream as having no order, which means that the order of
60/// elements may be affected by non-determinism.
61///
62/// This restricts certain operators, such as `fold` and `reduce`, to only
63/// be used with commutative aggregation functions.
64pub enum NoOrder {}
65
66#[sealed::sealed]
67impl Ordering for NoOrder {
68    const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
69}
70
71/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
72/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
73/// have `Self` guarantees instead.
74#[sealed::sealed]
75pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
76#[sealed::sealed]
77impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
78
79/// Helper trait for determining the weakest of two orderings.
80#[sealed::sealed]
81pub trait MinOrder<Other: ?Sized> {
82    /// The weaker of the two orderings.
83    type Min: Ordering;
84}
85
86#[sealed::sealed]
87impl<O: Ordering> MinOrder<O> for TotalOrder {
88    type Min = O;
89}
90
91#[sealed::sealed]
92impl<O: Ordering> MinOrder<O> for NoOrder {
93    type Min = NoOrder;
94}
95
96/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
97#[sealed::sealed]
98pub trait Retries:
99    MinRetries<Self, Min = Self>
100    + MinRetries<ExactlyOnce, Min = Self>
101    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
102{
103    /// The [`StreamRetry`] corresponding to this type.
104    const RETRIES_KIND: StreamRetry;
105}
106
107/// Marks the stream as having deterministic message cardinality, with no
108/// possibility of duplicates.
109pub enum ExactlyOnce {}
110
111#[sealed::sealed]
112impl Retries for ExactlyOnce {
113    const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
114}
115
116/// Marks the stream as having non-deterministic message cardinality, which
117/// means that duplicates may occur, but messages will not be dropped.
118pub enum AtLeastOnce {}
119
120#[sealed::sealed]
121impl Retries for AtLeastOnce {
122    const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
123}
124
125/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
126/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
127/// have `Self` guarantees instead.
128#[sealed::sealed]
129pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
130#[sealed::sealed]
131impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
132
133/// Helper trait for determining the weakest of two retry guarantees.
134#[sealed::sealed]
135pub trait MinRetries<Other: ?Sized> {
136    /// The weaker of the two retry guarantees.
137    type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
138}
139
140#[sealed::sealed]
141impl<R: Retries> MinRetries<R> for ExactlyOnce {
142    type Min = R;
143}
144
145#[sealed::sealed]
146impl<R: Retries> MinRetries<R> for AtLeastOnce {
147    type Min = AtLeastOnce;
148}
149
150#[sealed::sealed]
151#[diagnostic::on_unimplemented(
152    message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
153    label = "required here",
154    note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
155)]
156/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
157pub trait IsOrdered: Ordering {}
158
159#[sealed::sealed]
160#[diagnostic::do_not_recommend]
161impl IsOrdered for TotalOrder {}
162
163#[sealed::sealed]
164#[diagnostic::on_unimplemented(
165    message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
166    label = "required here",
167    note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
168)]
169/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
170pub trait IsExactlyOnce: Retries {}
171
172#[sealed::sealed]
173#[diagnostic::do_not_recommend]
174impl IsExactlyOnce for ExactlyOnce {}
175
176/// Streaming sequence of elements with type `Type`.
177///
178/// This live collection represents a growing sequence of elements, with new elements being
179/// asynchronously appended to the end of the sequence. This can be used to model the arrival
180/// of network input, such as API requests, or streaming ingestion.
181///
182/// By default, all streams have deterministic ordering and each element is materialized exactly
183/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
184/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
185/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
186///
187/// Type Parameters:
188/// - `Type`: the type of elements in the stream
189/// - `Loc`: the location where the stream is being materialized
190/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
191/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
192///   (default is [`TotalOrder`])
193/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
194///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
195pub struct Stream<
196    Type,
197    Loc,
198    Bound: Boundedness = Unbounded,
199    Order: Ordering = TotalOrder,
200    Retry: Retries = ExactlyOnce,
201> {
202    pub(crate) location: Loc,
203    pub(crate) ir_node: RefCell<HydroNode>,
204    pub(crate) flow_state: FlowState,
205
206    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
207}
208
209impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
210    fn drop(&mut self) {
211        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
212        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
213            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
214                input: Box::new(ir_node),
215                op_metadata: HydroIrOpMetadata::new(),
216            });
217        }
218    }
219}
220
221impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
222    for Stream<T, L, Unbounded, O, R>
223where
224    L: Location<'a>,
225{
226    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
227        let new_meta = stream
228            .location
229            .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
230
231        Stream {
232            location: stream.location.clone(),
233            flow_state: stream.flow_state.clone(),
234            ir_node: RefCell::new(HydroNode::Cast {
235                inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
236                metadata: new_meta,
237            }),
238            _phantom: PhantomData,
239        }
240    }
241}
242
243impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
244    for Stream<T, L, B, NoOrder, R>
245where
246    L: Location<'a>,
247{
248    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
249        stream.weaken_ordering()
250    }
251}
252
253impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
254    for Stream<T, L, B, O, AtLeastOnce>
255where
256    L: Location<'a>,
257{
258    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
259        stream.weaken_retries()
260    }
261}
262
263impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
264where
265    L: Location<'a>,
266{
267    fn defer_tick(self) -> Self {
268        Stream::defer_tick(self)
269    }
270}
271
272impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
273    for Stream<T, Tick<L>, Bounded, O, R>
274where
275    L: Location<'a>,
276{
277    type Location = Tick<L>;
278
279    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
280        Stream::new(
281            location.clone(),
282            HydroNode::CycleSource {
283                cycle_id,
284                metadata: location.new_node_metadata(Self::collection_kind()),
285            },
286        )
287    }
288}
289
290impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
291    for Stream<T, Tick<L>, Bounded, O, R>
292where
293    L: Location<'a>,
294{
295    type Location = Tick<L>;
296
297    fn location(&self) -> &Self::Location {
298        self.location()
299    }
300
301    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
302        let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
303            location.clone(),
304            HydroNode::DeferTick {
305                input: Box::new(HydroNode::CycleSource {
306                    cycle_id,
307                    metadata: location.new_node_metadata(Self::collection_kind()),
308                }),
309                metadata: location.new_node_metadata(Self::collection_kind()),
310            },
311        );
312
313        from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
314    }
315}
316
317impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
318    for Stream<T, Tick<L>, Bounded, O, R>
319where
320    L: Location<'a>,
321{
322    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
323        assert_eq!(
324            Location::id(&self.location),
325            expected_location,
326            "locations do not match"
327        );
328        self.location
329            .flow_state()
330            .borrow_mut()
331            .push_root(HydroRoot::CycleSink {
332                cycle_id,
333                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
334                op_metadata: HydroIrOpMetadata::new(),
335            });
336    }
337}
338
339impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
340    for Stream<T, L, B, O, R>
341where
342    L: Location<'a>,
343{
344    type Location = L;
345
346    fn create_source(cycle_id: CycleId, location: L) -> Self {
347        Stream::new(
348            location.clone(),
349            HydroNode::CycleSource {
350                cycle_id,
351                metadata: location.new_node_metadata(Self::collection_kind()),
352            },
353        )
354    }
355}
356
357impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
358    for Stream<T, L, B, O, R>
359where
360    L: Location<'a>,
361{
362    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
363        assert_eq!(
364            Location::id(&self.location),
365            expected_location,
366            "locations do not match"
367        );
368        self.location
369            .flow_state()
370            .borrow_mut()
371            .push_root(HydroRoot::CycleSink {
372                cycle_id,
373                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
374                op_metadata: HydroIrOpMetadata::new(),
375            });
376    }
377}
378
379impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
380where
381    T: Clone,
382    L: Location<'a>,
383{
384    fn clone(&self) -> Self {
385        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
386            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
387            *self.ir_node.borrow_mut() = HydroNode::Tee {
388                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
389                metadata: self.location.new_node_metadata(Self::collection_kind()),
390            };
391        }
392
393        let HydroNode::Tee { inner, metadata } = &*self.ir_node.borrow() else {
394            unreachable!()
395        };
396        Stream {
397            location: self.location.clone(),
398            flow_state: self.flow_state.clone(),
399            ir_node: HydroNode::Tee {
400                inner: SharedNode(inner.0.clone()),
401                metadata: metadata.clone(),
402            }
403            .into(),
404            _phantom: PhantomData,
405        }
406    }
407}
408
409impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
410where
411    L: Location<'a>,
412{
413    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
414        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
415        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
416
417        let flow_state = location.flow_state().clone();
418        Stream {
419            location,
420            flow_state,
421            ir_node: RefCell::new(ir_node),
422            _phantom: PhantomData,
423        }
424    }
425
426    /// Returns the [`Location`] where this stream is being materialized.
427    pub fn location(&self) -> &L {
428        &self.location
429    }
430
431    /// Weakens the consistency of this live collection to not guarantee any consistency across
432    /// cluster members (if this collection is on a cluster).
433    pub fn weaken_consistency(self) -> Stream<T, L::DropConsistency, B, O, R>
434    where
435        L: Location<'a>,
436    {
437        if L::consistency()
438            .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
439        {
440            // already no consistency
441            Stream::new(
442                self.location.drop_consistency(),
443                self.ir_node.replace(HydroNode::Placeholder),
444            )
445        } else {
446            Stream::new(
447                self.location.drop_consistency(),
448                HydroNode::Cast {
449                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
450                    metadata: self.location.drop_consistency().new_node_metadata(Stream::<
451                        T,
452                        L::DropConsistency,
453                        B,
454                        O,
455                        R,
456                    >::collection_kind(
457                    )),
458                },
459            )
460        }
461    }
462
463    /// Casts this live collection to have the consistency guarantees specified in the given
464    /// location type parameter. The developer must ensure that the strengthened consistency
465    /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
466    pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
467        self,
468        _proof: impl crate::properties::ConsistencyProof,
469    ) -> Stream<T, L2, B, O, R>
470    where
471        L: Location<'a>,
472    {
473        if L::consistency() == L2::consistency() {
474            Stream::new(
475                self.location.with_consistency_of(),
476                self.ir_node.replace(HydroNode::Placeholder),
477            )
478        } else {
479            Stream::new(
480                self.location.with_consistency_of(),
481                HydroNode::AssertIsConsistent {
482                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
483                    trusted: false,
484                    metadata: self
485                        .location
486                        .clone()
487                        .with_consistency_of::<L2>()
488                        .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
489                },
490            )
491        }
492    }
493
494    pub(crate) fn assert_has_consistency_of_trusted<
495        L2: Location<'a, DropConsistency = L::DropConsistency>,
496    >(
497        self,
498        _proof: impl crate::properties::ConsistencyProof,
499    ) -> Stream<T, L2, B, O, R>
500    where
501        L: Location<'a>,
502    {
503        if L::consistency() == L2::consistency() {
504            Stream::new(
505                self.location.with_consistency_of(),
506                self.ir_node.replace(HydroNode::Placeholder),
507            )
508        } else {
509            Stream::new(
510                self.location.with_consistency_of(),
511                HydroNode::AssertIsConsistent {
512                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
513                    trusted: true,
514                    metadata: self
515                        .location
516                        .clone()
517                        .with_consistency_of::<L2>()
518                        .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
519                },
520            )
521        }
522    }
523
524    pub(crate) fn collection_kind() -> CollectionKind {
525        CollectionKind::Stream {
526            bound: B::BOUND_KIND,
527            order: O::ORDERING_KIND,
528            retry: R::RETRIES_KIND,
529            element_type: quote_type::<T>().into(),
530        }
531    }
532
533    /// Produces a stream based on invoking `f` on each element.
534    /// If you do not want to modify the stream and instead only want to view
535    /// each item use [`Stream::inspect`] instead.
536    ///
537    /// # Example
538    /// ```rust
539    /// # #[cfg(feature = "deploy")] {
540    /// # use hydro_lang::prelude::*;
541    /// # use futures::StreamExt;
542    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
543    /// let words = process.source_iter(q!(vec!["hello", "world"]));
544    /// words.map(q!(|x| x.to_uppercase()))
545    /// # }, |mut stream| async move {
546    /// # for w in vec!["HELLO", "WORLD"] {
547    /// #     assert_eq!(stream.next().await.unwrap(), w);
548    /// # }
549    /// # }));
550    /// # }
551    /// ```
552    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
553    where
554        F: Fn(T) -> U + 'a,
555    {
556        let f = crate::singleton_ref::with_singleton_capture(|| {
557            f.splice_fn1_ctx(&self.location).into()
558        });
559        Stream::new(
560            self.location.clone(),
561            HydroNode::Map {
562                f,
563                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
564                metadata: self
565                    .location
566                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
567            },
568        )
569    }
570
571    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
572    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
573    /// for the output type `U` must produce items in a **deterministic** order.
574    ///
575    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
576    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
577    ///
578    /// # Example
579    /// ```rust
580    /// # #[cfg(feature = "deploy")] {
581    /// # use hydro_lang::prelude::*;
582    /// # use futures::StreamExt;
583    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
584    /// process
585    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
586    ///     .flat_map_ordered(q!(|x| x))
587    /// # }, |mut stream| async move {
588    /// // 1, 2, 3, 4
589    /// # for w in (1..5) {
590    /// #     assert_eq!(stream.next().await.unwrap(), w);
591    /// # }
592    /// # }));
593    /// # }
594    /// ```
595    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
596    where
597        I: IntoIterator<Item = U>,
598        F: Fn(T) -> I + 'a,
599    {
600        let f = crate::singleton_ref::with_singleton_capture(|| {
601            f.splice_fn1_ctx(&self.location).into()
602        });
603        Stream::new(
604            self.location.clone(),
605            HydroNode::FlatMap {
606                f,
607                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
608                metadata: self
609                    .location
610                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
611            },
612        )
613    }
614
615    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
616    /// for the output type `U` to produce items in any order.
617    ///
618    /// # Example
619    /// ```rust
620    /// # #[cfg(feature = "deploy")] {
621    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
622    /// # use futures::StreamExt;
623    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
624    /// process
625    ///     .source_iter(q!(vec![
626    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
627    ///         std::collections::HashSet::from_iter(vec![3, 4]),
628    ///     ]))
629    ///     .flat_map_unordered(q!(|x| x))
630    /// # }, |mut stream| async move {
631    /// // 1, 2, 3, 4, but in no particular order
632    /// # let mut results = Vec::new();
633    /// # for w in (1..5) {
634    /// #     results.push(stream.next().await.unwrap());
635    /// # }
636    /// # results.sort();
637    /// # assert_eq!(results, vec![1, 2, 3, 4]);
638    /// # }));
639    /// # }
640    /// ```
641    pub fn flat_map_unordered<U, I, F>(
642        self,
643        f: impl IntoQuotedMut<'a, F, L>,
644    ) -> Stream<U, L, B, NoOrder, R>
645    where
646        I: IntoIterator<Item = U>,
647        F: Fn(T) -> I + 'a,
648    {
649        let f = crate::singleton_ref::with_singleton_capture(|| {
650            f.splice_fn1_ctx(&self.location).into()
651        });
652        Stream::new(
653            self.location.clone(),
654            HydroNode::FlatMap {
655                f,
656                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
657                metadata: self
658                    .location
659                    .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
660            },
661        )
662    }
663
664    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
665    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
666    ///
667    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
668    /// not deterministic, use [`Stream::flatten_unordered`] instead.
669    ///
670    /// ```rust
671    /// # #[cfg(feature = "deploy")] {
672    /// # use hydro_lang::prelude::*;
673    /// # use futures::StreamExt;
674    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
675    /// process
676    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
677    ///     .flatten_ordered()
678    /// # }, |mut stream| async move {
679    /// // 1, 2, 3, 4
680    /// # for w in (1..5) {
681    /// #     assert_eq!(stream.next().await.unwrap(), w);
682    /// # }
683    /// # }));
684    /// # }
685    /// ```
686    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
687    where
688        T: IntoIterator<Item = U>,
689    {
690        self.flat_map_ordered(q!(|d| d))
691    }
692
693    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
694    /// for the element type `T` to produce items in any order.
695    ///
696    /// # Example
697    /// ```rust
698    /// # #[cfg(feature = "deploy")] {
699    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
700    /// # use futures::StreamExt;
701    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
702    /// process
703    ///     .source_iter(q!(vec![
704    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
705    ///         std::collections::HashSet::from_iter(vec![3, 4]),
706    ///     ]))
707    ///     .flatten_unordered()
708    /// # }, |mut stream| async move {
709    /// // 1, 2, 3, 4, but in no particular order
710    /// # let mut results = Vec::new();
711    /// # for w in (1..5) {
712    /// #     results.push(stream.next().await.unwrap());
713    /// # }
714    /// # results.sort();
715    /// # assert_eq!(results, vec![1, 2, 3, 4]);
716    /// # }));
717    /// # }
718    /// ```
719    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
720    where
721        T: IntoIterator<Item = U>,
722    {
723        self.flat_map_unordered(q!(|d| d))
724    }
725
726    /// For each item in the input stream, apply `f` to produce a [`futures::stream::Stream`],
727    /// then emit the elements of that stream one by one. When the inner stream yields
728    /// `Pending`, this operator yields as well.
729    pub fn flat_map_stream_blocking<U, S, F>(
730        self,
731        f: impl IntoQuotedMut<'a, F, L>,
732    ) -> Stream<U, L, B, O, R>
733    where
734        S: futures::Stream<Item = U>,
735        F: Fn(T) -> S + 'a,
736    {
737        let f = f.splice_fn1_ctx(&self.location).into();
738        Stream::new(
739            self.location.clone(),
740            HydroNode::FlatMapStreamBlocking {
741                f,
742                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
743                metadata: self
744                    .location
745                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
746            },
747        )
748    }
749
750    /// For each item in the input stream, treat it as a [`futures::stream::Stream`] and
751    /// emit its elements one by one. When the inner stream yields `Pending`, this operator
752    /// yields as well.
753    pub fn flatten_stream_blocking<U>(self) -> Stream<U, L, B, O, R>
754    where
755        T: futures::Stream<Item = U>,
756    {
757        self.flat_map_stream_blocking(q!(|d| d))
758    }
759
760    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
761    /// `f`, preserving the order of the elements.
762    ///
763    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
764    /// not modify or take ownership of the values. If you need to modify the values while filtering
765    /// use [`Stream::filter_map`] instead.
766    ///
767    /// # Example
768    /// ```rust
769    /// # #[cfg(feature = "deploy")] {
770    /// # use hydro_lang::prelude::*;
771    /// # use futures::StreamExt;
772    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
773    /// process
774    ///     .source_iter(q!(vec![1, 2, 3, 4]))
775    ///     .filter(q!(|&x| x > 2))
776    /// # }, |mut stream| async move {
777    /// // 3, 4
778    /// # for w in (3..5) {
779    /// #     assert_eq!(stream.next().await.unwrap(), w);
780    /// # }
781    /// # }));
782    /// # }
783    /// ```
784    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
785    where
786        F: Fn(&T) -> bool + 'a,
787    {
788        let f = crate::singleton_ref::with_singleton_capture(|| {
789            f.splice_fn1_borrow_ctx(&self.location).into()
790        });
791        Stream::new(
792            self.location.clone(),
793            HydroNode::Filter {
794                f,
795                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
796                metadata: self.location.new_node_metadata(Self::collection_kind()),
797            },
798        )
799    }
800
801    /// Splits the stream into two streams based on a predicate, without cloning elements.
802    ///
803    /// Elements for which `f` returns `true` are sent to the first output stream,
804    /// and elements for which `f` returns `false` are sent to the second output stream.
805    ///
806    /// Unlike using `filter` twice, this only evaluates the predicate once per element
807    /// and does not require `T: Clone`.
808    ///
809    /// The closure `f` receives a reference `&T` rather than an owned value `T` because
810    /// the predicate is only used for routing; the element itself is moved to the
811    /// appropriate output stream.
812    ///
813    /// # Example
814    /// ```rust
815    /// # #[cfg(feature = "deploy")] {
816    /// # use hydro_lang::prelude::*;
817    /// # use hydro_lang::live_collections::stream::{NoOrder, ExactlyOnce};
818    /// # use futures::StreamExt;
819    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
820    /// let numbers: Stream<_, _, Unbounded> = process.source_iter(q!(vec![1, 2, 3, 4, 5, 6])).into();
821    /// let (evens, odds) = numbers.partition(q!(|&x| x % 2 == 0));
822    /// // evens: 2, 4, 6 tagged with true; odds: 1, 3, 5 tagged with false
823    /// evens.map(q!(|x| (x, true)))
824    ///     .merge_unordered(odds.map(q!(|x| (x, false))))
825    /// # }, |mut stream| async move {
826    /// # let mut results = Vec::new();
827    /// # for _ in 0..6 {
828    /// #     results.push(stream.next().await.unwrap());
829    /// # }
830    /// # results.sort();
831    /// # assert_eq!(results, vec![(1, false), (2, true), (3, false), (4, true), (5, false), (6, true)]);
832    /// # }));
833    /// # }
834    /// ```
835    #[expect(
836        clippy::type_complexity,
837        reason = "return type mirrors the input stream type"
838    )]
839    pub fn partition<F>(
840        self,
841        f: impl IntoQuotedMut<'a, F, L>,
842    ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
843    where
844        F: Fn(&T) -> bool + 'a,
845    {
846        let f = crate::singleton_ref::with_singleton_capture(|| {
847            f.splice_fn1_borrow_ctx(&self.location).into()
848        });
849        let shared = SharedNode(Rc::new(RefCell::new(
850            self.ir_node.replace(HydroNode::Placeholder),
851        )));
852
853        let true_stream = Stream::new(
854            self.location.clone(),
855            HydroNode::Partition {
856                inner: SharedNode(shared.0.clone()),
857                f: f.clone(),
858                is_true: true,
859                metadata: self.location.new_node_metadata(Self::collection_kind()),
860            },
861        );
862
863        let false_stream = Stream::new(
864            self.location.clone(),
865            HydroNode::Partition {
866                inner: SharedNode(shared.0),
867                f,
868                is_true: false,
869                metadata: self.location.new_node_metadata(Self::collection_kind()),
870            },
871        );
872
873        (true_stream, false_stream)
874    }
875
876    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
877    ///
878    /// # Example
879    /// ```rust
880    /// # #[cfg(feature = "deploy")] {
881    /// # use hydro_lang::prelude::*;
882    /// # use futures::StreamExt;
883    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
884    /// process
885    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
886    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
887    /// # }, |mut stream| async move {
888    /// // 1, 2
889    /// # for w in (1..3) {
890    /// #     assert_eq!(stream.next().await.unwrap(), w);
891    /// # }
892    /// # }));
893    /// # }
894    /// ```
895    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
896    where
897        F: Fn(T) -> Option<U> + 'a,
898    {
899        let f = f.splice_fn1_ctx(&self.location).into();
900        Stream::new(
901            self.location.clone(),
902            HydroNode::FilterMap {
903                f,
904                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
905                metadata: self
906                    .location
907                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
908            },
909        )
910    }
911
912    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
913    /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
914    /// If `other` is an empty [`Optional`], no values will be produced.
915    ///
916    /// # Example
917    /// ```rust
918    /// # #[cfg(feature = "deploy")] {
919    /// # use hydro_lang::prelude::*;
920    /// # use futures::StreamExt;
921    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
922    /// let tick = process.tick();
923    /// let batch = process
924    ///   .source_iter(q!(vec![1, 2, 3, 4]))
925    ///   .batch(&tick, nondet!(/** test */));
926    /// let count = batch.clone().count(); // `count()` returns a singleton
927    /// batch.cross_singleton(count).all_ticks()
928    /// # }, |mut stream| async move {
929    /// // (1, 4), (2, 4), (3, 4), (4, 4)
930    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
931    /// #     assert_eq!(stream.next().await.unwrap(), w);
932    /// # }
933    /// # }));
934    /// # }
935    /// ```
936    pub fn cross_singleton<O2>(
937        self,
938        other: impl Into<Optional<O2, L, Bounded>>,
939    ) -> Stream<(T, O2), L, B, O, R>
940    where
941        O2: Clone,
942    {
943        let other: Optional<O2, L, Bounded> = other.into();
944        check_matching_location(&self.location, &other.location);
945
946        Stream::new(
947            self.location.clone(),
948            HydroNode::CrossSingleton {
949                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
950                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
951                metadata: self
952                    .location
953                    .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
954            },
955        )
956    }
957
958    /// Passes this stream through if the boolean signal is `true`, otherwise the output is empty.
959    ///
960    /// # Example
961    /// ```rust
962    /// # #[cfg(feature = "deploy")] {
963    /// # use hydro_lang::prelude::*;
964    /// # use futures::StreamExt;
965    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
966    /// let tick = process.tick();
967    /// // ticks are lazy by default, forces the second tick to run
968    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
969    ///
970    /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
971    /// let batch_first_tick = process
972    ///   .source_iter(q!(vec![1, 2, 3, 4]))
973    ///   .batch(&tick, nondet!(/** test */));
974    /// let batch_second_tick = process
975    ///   .source_iter(q!(vec![5, 6, 7, 8]))
976    ///   .batch(&tick, nondet!(/** test */))
977    ///   .defer_tick();
978    /// batch_first_tick.chain(batch_second_tick)
979    ///   .filter_if(signal)
980    ///   .all_ticks()
981    /// # }, |mut stream| async move {
982    /// // [1, 2, 3, 4]
983    /// # for w in vec![1, 2, 3, 4] {
984    /// #     assert_eq!(stream.next().await.unwrap(), w);
985    /// # }
986    /// # }));
987    /// # }
988    /// ```
989    pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
990        self.cross_singleton(signal.filter(q!(|b| *b)))
991            .map(q!(|(d, _)| d))
992    }
993
994    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
995    ///
996    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
997    /// leader of a cluster.
998    ///
999    /// # Example
1000    /// ```rust
1001    /// # #[cfg(feature = "deploy")] {
1002    /// # use hydro_lang::prelude::*;
1003    /// # use futures::StreamExt;
1004    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1005    /// let tick = process.tick();
1006    /// // ticks are lazy by default, forces the second tick to run
1007    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1008    ///
1009    /// let batch_first_tick = process
1010    ///   .source_iter(q!(vec![1, 2, 3, 4]))
1011    ///   .batch(&tick, nondet!(/** test */));
1012    /// let batch_second_tick = process
1013    ///   .source_iter(q!(vec![5, 6, 7, 8]))
1014    ///   .batch(&tick, nondet!(/** test */))
1015    ///   .defer_tick(); // appears on the second tick
1016    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1017    /// batch_first_tick.chain(batch_second_tick)
1018    ///   .filter_if_some(some_on_first_tick)
1019    ///   .all_ticks()
1020    /// # }, |mut stream| async move {
1021    /// // [1, 2, 3, 4]
1022    /// # for w in vec![1, 2, 3, 4] {
1023    /// #     assert_eq!(stream.next().await.unwrap(), w);
1024    /// # }
1025    /// # }));
1026    /// # }
1027    /// ```
1028    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1029    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1030        self.filter_if(signal.is_some())
1031    }
1032
1033    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
1034    ///
1035    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
1036    /// some local state.
1037    ///
1038    /// # Example
1039    /// ```rust
1040    /// # #[cfg(feature = "deploy")] {
1041    /// # use hydro_lang::prelude::*;
1042    /// # use futures::StreamExt;
1043    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1044    /// let tick = process.tick();
1045    /// // ticks are lazy by default, forces the second tick to run
1046    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1047    ///
1048    /// let batch_first_tick = process
1049    ///   .source_iter(q!(vec![1, 2, 3, 4]))
1050    ///   .batch(&tick, nondet!(/** test */));
1051    /// let batch_second_tick = process
1052    ///   .source_iter(q!(vec![5, 6, 7, 8]))
1053    ///   .batch(&tick, nondet!(/** test */))
1054    ///   .defer_tick(); // appears on the second tick
1055    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1056    /// batch_first_tick.chain(batch_second_tick)
1057    ///   .filter_if_none(some_on_first_tick)
1058    ///   .all_ticks()
1059    /// # }, |mut stream| async move {
1060    /// // [5, 6, 7, 8]
1061    /// # for w in vec![5, 6, 7, 8] {
1062    /// #     assert_eq!(stream.next().await.unwrap(), w);
1063    /// # }
1064    /// # }));
1065    /// # }
1066    /// ```
1067    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1068    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1069        self.filter_if(other.is_none())
1070    }
1071
1072    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams,
1073    /// returning all tupled pairs.
1074    ///
1075    /// When the right side is [`Bounded`], it is accumulated first and the left side streams
1076    /// through, preserving the left side's ordering. When both sides are [`Unbounded`], a
1077    /// symmetric hash join is used and ordering is [`NoOrder`].
1078    ///
1079    /// # Example
1080    /// ```rust
1081    /// # #[cfg(feature = "deploy")] {
1082    /// # use hydro_lang::prelude::*;
1083    /// # use std::collections::HashSet;
1084    /// # use futures::StreamExt;
1085    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1086    /// let tick = process.tick();
1087    /// let stream1 = process.source_iter(q!(vec![1, 2]));
1088    /// let stream2 = process.source_iter(q!(vec!['a', 'b']));
1089    /// stream1.cross_product(stream2)
1090    /// # }, |mut stream| async move {
1091    /// // (1, 'a'), (1, 'b'), (2, 'a'), (2, 'b') in any order
1092    /// # let expected = HashSet::from([(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]);
1093    /// # stream.map(|i| assert!(expected.contains(&i)));
1094    /// # }));
1095    /// # }
1096    pub fn cross_product<T2, B2: Boundedness, O2: Ordering>(
1097        self,
1098        other: Stream<T2, L, B2, O2, R>,
1099    ) -> Stream<(T, T2), L, B, B2::PreserveOrderIfBounded<O>, R>
1100    where
1101        T: Clone,
1102        T2: Clone,
1103    {
1104        self.map(q!(|v| ((), v)))
1105            .join(other.map(q!(|v| ((), v))))
1106            .map(q!(|((), (v1, v2))| (v1, v2)))
1107    }
1108
1109    /// Takes one stream as input and filters out any duplicate occurrences. The output
1110    /// contains all unique values from the input.
1111    ///
1112    /// # Example
1113    /// ```rust
1114    /// # #[cfg(feature = "deploy")] {
1115    /// # use hydro_lang::prelude::*;
1116    /// # use futures::StreamExt;
1117    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1118    /// let tick = process.tick();
1119    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
1120    /// # }, |mut stream| async move {
1121    /// # for w in vec![1, 2, 3, 4] {
1122    /// #     assert_eq!(stream.next().await.unwrap(), w);
1123    /// # }
1124    /// # }));
1125    /// # }
1126    /// ```
1127    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
1128    where
1129        T: Eq + Hash,
1130    {
1131        Stream::new(
1132            self.location.clone(),
1133            HydroNode::Unique {
1134                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1135                metadata: self
1136                    .location
1137                    .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1138            },
1139        )
1140    }
1141
1142    /// Outputs everything in this stream that is *not* contained in the `other` stream.
1143    ///
1144    /// The `other` stream must be [`Bounded`], since this function will wait until
1145    /// all its elements are available before producing any output.
1146    /// # Example
1147    /// ```rust
1148    /// # #[cfg(feature = "deploy")] {
1149    /// # use hydro_lang::prelude::*;
1150    /// # use futures::StreamExt;
1151    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1152    /// let tick = process.tick();
1153    /// let stream = process
1154    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
1155    ///   .batch(&tick, nondet!(/** test */));
1156    /// let batch = process
1157    ///   .source_iter(q!(vec![1, 2]))
1158    ///   .batch(&tick, nondet!(/** test */));
1159    /// stream.filter_not_in(batch).all_ticks()
1160    /// # }, |mut stream| async move {
1161    /// # for w in vec![3, 4] {
1162    /// #     assert_eq!(stream.next().await.unwrap(), w);
1163    /// # }
1164    /// # }));
1165    /// # }
1166    /// ```
1167    pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1168    where
1169        T: Eq + Hash,
1170        B2: IsBounded,
1171    {
1172        check_matching_location(&self.location, &other.location);
1173
1174        Stream::new(
1175            self.location.clone(),
1176            HydroNode::Difference {
1177                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1178                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1179                metadata: self
1180                    .location
1181                    .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1182            },
1183        )
1184    }
1185
1186    /// An operator which allows you to "inspect" each element of a stream without
1187    /// modifying it. The closure `f` is called on a reference to each item. This is
1188    /// mainly useful for debugging, and should not be used to generate side-effects.
1189    ///
1190    /// # Example
1191    /// ```rust
1192    /// # #[cfg(feature = "deploy")] {
1193    /// # use hydro_lang::prelude::*;
1194    /// # use futures::StreamExt;
1195    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1196    /// let nums = process.source_iter(q!(vec![1, 2]));
1197    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
1198    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
1199    /// # }, |mut stream| async move {
1200    /// # for w in vec![1, 2] {
1201    /// #     assert_eq!(stream.next().await.unwrap(), w);
1202    /// # }
1203    /// # }));
1204    /// # }
1205    /// ```
1206    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L::DropConsistency>) -> Self
1207    where
1208        F: Fn(&T) + 'a,
1209    {
1210        let f = crate::singleton_ref::with_singleton_capture(|| {
1211            f.splice_fn1_borrow_ctx(&self.location.drop_consistency())
1212                .into()
1213        });
1214
1215        Stream::new(
1216            self.location.clone(),
1217            HydroNode::Inspect {
1218                f,
1219                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1220                metadata: self.location.new_node_metadata(Self::collection_kind()),
1221            },
1222        )
1223    }
1224
1225    /// Executes the provided closure for every element in this stream.
1226    ///
1227    /// Because the closure may have side effects, the stream must have deterministic order
1228    /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
1229    /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
1230    /// [`Stream::assume_retries`] with an explanation for why this is the case.
1231    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
1232    where
1233        O: IsOrdered,
1234        R: IsExactlyOnce,
1235    {
1236        let f = f.splice_fn1_ctx(&self.location).into();
1237        self.location
1238            .flow_state()
1239            .borrow_mut()
1240            .push_root(HydroRoot::ForEach {
1241                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1242                f,
1243                op_metadata: HydroIrOpMetadata::new(),
1244            });
1245    }
1246
1247    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
1248    /// TCP socket to some other server. You should _not_ use this API for interacting with
1249    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
1250    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
1251    /// interaction with asynchronous sinks.
1252    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1253    where
1254        O: IsOrdered,
1255        R: IsExactlyOnce,
1256        S: 'a + futures::Sink<T> + Unpin,
1257    {
1258        self.location
1259            .flow_state()
1260            .borrow_mut()
1261            .push_root(HydroRoot::DestSink {
1262                sink: sink.splice_typed_ctx(&self.location).into(),
1263                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1264                op_metadata: HydroIrOpMetadata::new(),
1265            });
1266    }
1267
1268    /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1269    ///
1270    /// # Example
1271    /// ```rust
1272    /// # #[cfg(feature = "deploy")] {
1273    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1274    /// # use futures::StreamExt;
1275    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1276    /// let tick = process.tick();
1277    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1278    /// numbers.enumerate()
1279    /// # }, |mut stream| async move {
1280    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1281    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1282    /// #     assert_eq!(stream.next().await.unwrap(), w);
1283    /// # }
1284    /// # }));
1285    /// # }
1286    /// ```
1287    pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1288    where
1289        O: IsOrdered,
1290        R: IsExactlyOnce,
1291    {
1292        Stream::new(
1293            self.location.clone(),
1294            HydroNode::Enumerate {
1295                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1296                metadata: self.location.new_node_metadata(Stream::<
1297                    (usize, T),
1298                    L,
1299                    B,
1300                    TotalOrder,
1301                    ExactlyOnce,
1302                >::collection_kind()),
1303            },
1304        )
1305    }
1306
1307    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1308    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1309    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1310    ///
1311    /// Depending on the input stream guarantees, the closure may need to be commutative
1312    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1313    ///
1314    /// # Example
1315    /// ```rust
1316    /// # #[cfg(feature = "deploy")] {
1317    /// # use hydro_lang::prelude::*;
1318    /// # use futures::StreamExt;
1319    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1320    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1321    /// words
1322    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1323    ///     .into_stream()
1324    /// # }, |mut stream| async move {
1325    /// // "HELLOWORLD"
1326    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1327    /// # }));
1328    /// # }
1329    /// ```
1330    pub fn fold<A, I, F, C, Idemp, M, B2: SingletonBound>(
1331        self,
1332        init: impl IntoQuotedMut<'a, I, L>,
1333        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1334    ) -> Singleton<A, L, B2>
1335    where
1336        I: Fn() -> A + 'a,
1337        F: Fn(&mut A, T),
1338        C: ValidCommutativityFor<O>,
1339        Idemp: ValidIdempotenceFor<R>,
1340        B: ApplyMonotoneStream<M, B2>,
1341    {
1342        let init = init.splice_fn0_ctx(&self.location).into();
1343        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1344        proof.register_proof(&comb);
1345
1346        // Only assume_retries (for idempotence), not assume_ordering.
1347        // The fold hook in the simulator handles ordering non-determinism directly.
1348        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1349        let retried: Stream<T, L::DropConsistency, B, O, ExactlyOnce> = self.assume_retries(nondet);
1350
1351        let core = HydroNode::Fold {
1352            init,
1353            acc: comb.into(),
1354            input: Box::new(retried.ir_node.replace(HydroNode::Placeholder)),
1355            metadata: retried
1356                .location
1357                .new_node_metadata(Singleton::<A, L::DropConsistency, B2>::collection_kind()),
1358            // we do not guarantee consistency at this point because if the algebraic properties
1359            // do not hold in practice, replica consistency may fail to be maintained, so we
1360            // would like the simulator to assert consistency; in the future, this will be dynamic
1361            // based on the proof mechanism
1362        };
1363
1364        Singleton::new(retried.location.clone(), core)
1365            .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
1366    }
1367
1368    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1369    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1370    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1371    /// reference, so that it can be modified in place.
1372    ///
1373    /// Depending on the input stream guarantees, the closure may need to be commutative
1374    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1375    ///
1376    /// # Example
1377    /// ```rust
1378    /// # #[cfg(feature = "deploy")] {
1379    /// # use hydro_lang::prelude::*;
1380    /// # use futures::StreamExt;
1381    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1382    /// let bools = process.source_iter(q!(vec![false, true, false]));
1383    /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1384    /// # }, |mut stream| async move {
1385    /// // true
1386    /// # assert_eq!(stream.next().await.unwrap(), true);
1387    /// # }));
1388    /// # }
1389    /// ```
1390    pub fn reduce<F, C, Idemp>(
1391        self,
1392        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1393    ) -> Optional<T, L, B>
1394    where
1395        F: Fn(&mut T, T) + 'a,
1396        C: ValidCommutativityFor<O>,
1397        Idemp: ValidIdempotenceFor<R>,
1398    {
1399        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1400        proof.register_proof(&f);
1401
1402        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1403        let ordered_etc: Stream<T, L::DropConsistency, B> =
1404            self.assume_retries(nondet).assume_ordering(nondet);
1405
1406        let core = HydroNode::Reduce {
1407            f: f.into(),
1408            input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1409            metadata: ordered_etc
1410                .location
1411                .new_node_metadata(Optional::<T, L::DropConsistency, B>::collection_kind()),
1412        };
1413
1414        Optional::new(ordered_etc.location.clone(), core)
1415            .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
1416    }
1417
1418    /// Computes the maximum element in the stream as an [`Optional`], which
1419    /// will be empty until the first element in the input arrives.
1420    ///
1421    /// # Example
1422    /// ```rust
1423    /// # #[cfg(feature = "deploy")] {
1424    /// # use hydro_lang::prelude::*;
1425    /// # use futures::StreamExt;
1426    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1427    /// let tick = process.tick();
1428    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1429    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1430    /// batch.max().all_ticks()
1431    /// # }, |mut stream| async move {
1432    /// // 4
1433    /// # assert_eq!(stream.next().await.unwrap(), 4);
1434    /// # }));
1435    /// # }
1436    /// ```
1437    pub fn max(self) -> Optional<T, L, B>
1438    where
1439        T: Ord,
1440    {
1441        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1442            .assume_ordering_trusted_bounded::<TotalOrder>(
1443                nondet!(/** max is commutative, but order affects intermediates */),
1444            )
1445            .reduce(q!(|curr, new| {
1446                if new > *curr {
1447                    *curr = new;
1448                }
1449            }))
1450    }
1451
1452    /// Computes the minimum element in the stream as an [`Optional`], which
1453    /// will be empty until the first element in the input arrives.
1454    ///
1455    /// # Example
1456    /// ```rust
1457    /// # #[cfg(feature = "deploy")] {
1458    /// # use hydro_lang::prelude::*;
1459    /// # use futures::StreamExt;
1460    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1461    /// let tick = process.tick();
1462    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1463    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1464    /// batch.min().all_ticks()
1465    /// # }, |mut stream| async move {
1466    /// // 1
1467    /// # assert_eq!(stream.next().await.unwrap(), 1);
1468    /// # }));
1469    /// # }
1470    /// ```
1471    pub fn min(self) -> Optional<T, L, B>
1472    where
1473        T: Ord,
1474    {
1475        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1476            .assume_ordering_trusted_bounded::<TotalOrder>(
1477                nondet!(/** max is commutative, but order affects intermediates */),
1478            )
1479            .reduce(q!(|curr, new| {
1480                if new < *curr {
1481                    *curr = new;
1482                }
1483            }))
1484    }
1485
1486    /// Computes the first element in the stream as an [`Optional`], which
1487    /// will be empty until the first element in the input arrives.
1488    ///
1489    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1490    /// re-ordering of elements may cause the first element to change.
1491    ///
1492    /// # Example
1493    /// ```rust
1494    /// # #[cfg(feature = "deploy")] {
1495    /// # use hydro_lang::prelude::*;
1496    /// # use futures::StreamExt;
1497    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1498    /// let tick = process.tick();
1499    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1500    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1501    /// batch.first().all_ticks()
1502    /// # }, |mut stream| async move {
1503    /// // 1
1504    /// # assert_eq!(stream.next().await.unwrap(), 1);
1505    /// # }));
1506    /// # }
1507    /// ```
1508    pub fn first(self) -> Optional<T, L, B>
1509    where
1510        O: IsOrdered,
1511    {
1512        self.make_totally_ordered()
1513            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1514            .generator(q!(|| ()), q!(|_, item| Generate::Return(item)))
1515            .reduce(q!(|_, _| {}))
1516    }
1517
1518    /// Computes the last element in the stream as an [`Optional`], which
1519    /// will be empty until an element in the input arrives.
1520    ///
1521    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1522    /// re-ordering of elements may cause the last element to change.
1523    ///
1524    /// # Example
1525    /// ```rust
1526    /// # #[cfg(feature = "deploy")] {
1527    /// # use hydro_lang::prelude::*;
1528    /// # use futures::StreamExt;
1529    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1530    /// let tick = process.tick();
1531    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1532    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1533    /// batch.last().all_ticks()
1534    /// # }, |mut stream| async move {
1535    /// // 4
1536    /// # assert_eq!(stream.next().await.unwrap(), 4);
1537    /// # }));
1538    /// # }
1539    /// ```
1540    pub fn last(self) -> Optional<T, L, B>
1541    where
1542        O: IsOrdered,
1543    {
1544        self.make_totally_ordered()
1545            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1546            .reduce(q!(|curr, new| *curr = new))
1547    }
1548
1549    /// Returns a stream containing at most the first `n` elements of the input stream,
1550    /// preserving the original order. Similar to `LIMIT` in SQL.
1551    ///
1552    /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1553    /// retries, since the result depends on the order and cardinality of elements.
1554    ///
1555    /// # Example
1556    /// ```rust
1557    /// # #[cfg(feature = "deploy")] {
1558    /// # use hydro_lang::prelude::*;
1559    /// # use futures::StreamExt;
1560    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1561    /// let numbers = process.source_iter(q!(vec![10, 20, 30, 40, 50]));
1562    /// numbers.limit(q!(3))
1563    /// # }, |mut stream| async move {
1564    /// // 10, 20, 30
1565    /// # for w in vec![10, 20, 30] {
1566    /// #     assert_eq!(stream.next().await.unwrap(), w);
1567    /// # }
1568    /// # }));
1569    /// # }
1570    /// ```
1571    pub fn limit(
1572        self,
1573        n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1574    ) -> Stream<T, L, B, TotalOrder, ExactlyOnce>
1575    where
1576        O: IsOrdered,
1577        R: IsExactlyOnce,
1578    {
1579        self.generator(
1580            q!(|| 0usize),
1581            q!(move |count, item| {
1582                if *count == n {
1583                    Generate::Break
1584                } else {
1585                    *count += 1;
1586                    if *count == n {
1587                        Generate::Return(item)
1588                    } else {
1589                        Generate::Yield(item)
1590                    }
1591                }
1592            }),
1593        )
1594    }
1595
1596    /// Collects all the elements of this stream into a single [`Vec`] element.
1597    ///
1598    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1599    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1600    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1601    /// the vector at an arbitrary point in time.
1602    ///
1603    /// # Example
1604    /// ```rust
1605    /// # #[cfg(feature = "deploy")] {
1606    /// # use hydro_lang::prelude::*;
1607    /// # use futures::StreamExt;
1608    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1609    /// let tick = process.tick();
1610    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1611    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1612    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1613    /// # }, |mut stream| async move {
1614    /// // [ vec![1, 2, 3, 4] ]
1615    /// # for w in vec![vec![1, 2, 3, 4]] {
1616    /// #     assert_eq!(stream.next().await.unwrap(), w);
1617    /// # }
1618    /// # }));
1619    /// # }
1620    /// ```
1621    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1622    where
1623        O: IsOrdered,
1624        R: IsExactlyOnce,
1625    {
1626        self.make_totally_ordered().make_exactly_once().fold(
1627            q!(|| vec![]),
1628            q!(|acc, v| {
1629                acc.push(v);
1630            }),
1631        )
1632    }
1633
1634    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1635    /// and emitting each intermediate result.
1636    ///
1637    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1638    /// containing all intermediate accumulated values. The scan operation can also terminate early
1639    /// by returning `None`.
1640    ///
1641    /// The function takes a mutable reference to the accumulator and the current element, and returns
1642    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1643    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1644    ///
1645    /// # Examples
1646    ///
1647    /// Basic usage - running sum:
1648    /// ```rust
1649    /// # #[cfg(feature = "deploy")] {
1650    /// # use hydro_lang::prelude::*;
1651    /// # use futures::StreamExt;
1652    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1653    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1654    ///     q!(|| 0),
1655    ///     q!(|acc, x| {
1656    ///         *acc += x;
1657    ///         Some(*acc)
1658    ///     }),
1659    /// )
1660    /// # }, |mut stream| async move {
1661    /// // Output: 1, 3, 6, 10
1662    /// # for w in vec![1, 3, 6, 10] {
1663    /// #     assert_eq!(stream.next().await.unwrap(), w);
1664    /// # }
1665    /// # }));
1666    /// # }
1667    /// ```
1668    ///
1669    /// Early termination example:
1670    /// ```rust
1671    /// # #[cfg(feature = "deploy")] {
1672    /// # use hydro_lang::prelude::*;
1673    /// # use futures::StreamExt;
1674    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1675    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1676    ///     q!(|| 1),
1677    ///     q!(|state, x| {
1678    ///         *state = *state * x;
1679    ///         if *state > 6 {
1680    ///             None // Terminate the stream
1681    ///         } else {
1682    ///             Some(-*state)
1683    ///         }
1684    ///     }),
1685    /// )
1686    /// # }, |mut stream| async move {
1687    /// // Output: -1, -2, -6
1688    /// # for w in vec![-1, -2, -6] {
1689    /// #     assert_eq!(stream.next().await.unwrap(), w);
1690    /// # }
1691    /// # }));
1692    /// # }
1693    /// ```
1694    pub fn scan<A, U, I, F>(
1695        self,
1696        init: impl IntoQuotedMut<'a, I, L>,
1697        f: impl IntoQuotedMut<'a, F, L>,
1698    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1699    where
1700        O: IsOrdered,
1701        R: IsExactlyOnce,
1702        I: Fn() -> A + 'a,
1703        F: Fn(&mut A, T) -> Option<U> + 'a,
1704    {
1705        let init = init.splice_fn0_ctx(&self.location).into();
1706        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1707
1708        Stream::new(
1709            self.location.clone(),
1710            HydroNode::Scan {
1711                init,
1712                acc: f,
1713                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1714                metadata: self.location.new_node_metadata(
1715                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1716                ),
1717            },
1718        )
1719    }
1720
1721    /// Async version of [`Stream::scan`]. Applies an async function to each element of the
1722    /// stream, maintaining an internal state (accumulator) and emitting the values returned
1723    /// by the function.
1724    ///
1725    /// The closure runs synchronously (so it can mutate the accumulator), then returns a
1726    /// future. The future is polled to completion. If it resolves to `Some`, the value is
1727    /// emitted. If it resolves to `None`, the item is filtered out.
1728    ///
1729    /// # Examples
1730    ///
1731    /// ```rust
1732    /// # #[cfg(feature = "deploy")] {
1733    /// # use hydro_lang::prelude::*;
1734    /// # use futures::StreamExt;
1735    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1736    /// process
1737    ///     .source_iter(q!(vec![1, 2, 3, 4]))
1738    ///     .scan_async_blocking(
1739    ///         q!(|| 0),
1740    ///         q!(|acc, x| {
1741    ///             *acc += x;
1742    ///             let val = *acc;
1743    ///             async move { Some(val) }
1744    ///         }),
1745    ///     )
1746    /// # }, |mut stream| async move {
1747    /// // Output: 1, 3, 6, 10
1748    /// # for w in vec![1, 3, 6, 10] {
1749    /// #     assert_eq!(stream.next().await.unwrap(), w);
1750    /// # }
1751    /// # }));
1752    /// # }
1753    /// ```
1754    pub fn scan_async_blocking<A, U, I, F, Fut>(
1755        self,
1756        init: impl IntoQuotedMut<'a, I, L>,
1757        f: impl IntoQuotedMut<'a, F, L>,
1758    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1759    where
1760        O: IsOrdered,
1761        R: IsExactlyOnce,
1762        I: Fn() -> A + 'a,
1763        F: Fn(&mut A, T) -> Fut + 'a,
1764        Fut: Future<Output = Option<U>> + 'a,
1765    {
1766        let init = init.splice_fn0_ctx(&self.location).into();
1767        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1768
1769        Stream::new(
1770            self.location.clone(),
1771            HydroNode::ScanAsyncBlocking {
1772                init,
1773                acc: f,
1774                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1775                metadata: self.location.new_node_metadata(
1776                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1777                ),
1778            },
1779        )
1780    }
1781
1782    /// Iteratively processes the elements of the stream using a state machine that can yield
1783    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1784    /// syntax in Rust, without requiring special syntax.
1785    ///
1786    /// Like [`Stream::scan`], this function takes in an initializer that emits the initial
1787    /// state. The second argument defines the processing logic, taking in a mutable reference
1788    /// to the state and the value to be processed. It emits a [`Generate`] value, whose
1789    /// variants define what is emitted and whether further inputs should be processed.
1790    ///
1791    /// # Example
1792    /// ```rust
1793    /// # #[cfg(feature = "deploy")] {
1794    /// # use hydro_lang::prelude::*;
1795    /// # use futures::StreamExt;
1796    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1797    /// process.source_iter(q!(vec![1, 3, 100, 10])).generator(
1798    ///     q!(|| 0),
1799    ///     q!(|acc, x| {
1800    ///         *acc += x;
1801    ///         if *acc > 100 {
1802    ///             hydro_lang::live_collections::keyed_stream::Generate::Return("done!".to_owned())
1803    ///         } else if *acc % 2 == 0 {
1804    ///             hydro_lang::live_collections::keyed_stream::Generate::Yield("even".to_owned())
1805    ///         } else {
1806    ///             hydro_lang::live_collections::keyed_stream::Generate::Continue
1807    ///         }
1808    ///     }),
1809    /// )
1810    /// # }, |mut stream| async move {
1811    /// // Output: "even", "done!"
1812    /// # let mut results = Vec::new();
1813    /// # for _ in 0..2 {
1814    /// #     results.push(stream.next().await.unwrap());
1815    /// # }
1816    /// # results.sort();
1817    /// # assert_eq!(results, vec!["done!".to_owned(), "even".to_owned()]);
1818    /// # }));
1819    /// # }
1820    /// ```
1821    pub fn generator<A, U, I, F>(
1822        self,
1823        init: impl IntoQuotedMut<'a, I, L> + Copy,
1824        f: impl IntoQuotedMut<'a, F, L> + Copy,
1825    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1826    where
1827        O: IsOrdered,
1828        R: IsExactlyOnce,
1829        I: Fn() -> A + 'a,
1830        F: Fn(&mut A, T) -> Generate<U> + 'a,
1831    {
1832        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1833        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1834
1835        let this = self.make_totally_ordered().make_exactly_once();
1836
1837        // State is Option<Option<A>>:
1838        //   None = not yet initialized
1839        //   Some(Some(a)) = active with state a
1840        //   Some(None) = terminated
1841        let scan_init = q!(|| None)
1842            .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1843            .into();
1844        let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1845            if state.is_none() {
1846                *state = Some(Some(init()));
1847            }
1848            match state {
1849                Some(Some(state_value)) => match f(state_value, v) {
1850                    Generate::Yield(out) => Some(Some(out)),
1851                    Generate::Return(out) => {
1852                        *state = Some(None);
1853                        Some(Some(out))
1854                    }
1855                    // Unlike KeyedStream, we can terminate the scan directly on
1856                    // Break/Return because there is only one state (no other keys
1857                    // that still need processing).
1858                    Generate::Break => None,
1859                    Generate::Continue => Some(None),
1860                },
1861                // State is Some(None) after Return; terminate the scan.
1862                _ => None,
1863            }
1864        })
1865        .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1866        .into();
1867
1868        let scan_node = HydroNode::Scan {
1869            init: scan_init,
1870            acc: scan_f,
1871            input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1872            metadata: this.location.new_node_metadata(Stream::<
1873                Option<U>,
1874                L,
1875                B,
1876                TotalOrder,
1877                ExactlyOnce,
1878            >::collection_kind()),
1879        };
1880
1881        let flatten_f = q!(|d| d)
1882            .splice_fn1_ctx::<Option<U>, _>(&this.location)
1883            .into();
1884        let flatten_node = HydroNode::FlatMap {
1885            f: flatten_f,
1886            input: Box::new(scan_node),
1887            metadata: this
1888                .location
1889                .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1890        };
1891
1892        Stream::new(this.location.clone(), flatten_node)
1893    }
1894
1895    /// Given a time interval, returns a stream corresponding to samples taken from the
1896    /// stream roughly at that interval. The output will have elements in the same order
1897    /// as the input, but with arbitrary elements skipped between samples. There is also
1898    /// no guarantee on the exact timing of the samples.
1899    ///
1900    /// # Non-Determinism
1901    /// The output stream is non-deterministic in which elements are sampled, since this
1902    /// is controlled by a clock.
1903    pub fn sample_every(
1904        self,
1905        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1906        nondet: NonDet,
1907    ) -> Stream<T, L::DropConsistency, Unbounded, O, AtLeastOnce>
1908    where
1909        L: TopLevel<'a>,
1910    {
1911        let samples = self.location.source_interval(interval);
1912
1913        let tick = self.location.tick();
1914        self.batch(&tick, nondet)
1915            .filter_if(samples.batch(&tick, nondet).first().is_some())
1916            .all_ticks()
1917            .weaken_retries()
1918    }
1919
1920    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
1921    /// stream has not emitted a value since that duration.
1922    ///
1923    /// # Non-Determinism
1924    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1925    /// samples take place, timeouts may be non-deterministically generated or missed,
1926    /// and the notification of the timeout may be delayed as well. There is also no
1927    /// guarantee on how long the [`Optional`] will have a value after the timeout is
1928    /// detected based on when the next sample is taken.
1929    pub fn timeout(
1930        self,
1931        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L::DropConsistency>> + Copy + 'a,
1932        nondet: NonDet,
1933    ) -> Optional<(), L::DropConsistency, Unbounded>
1934    where
1935        L: TopLevel<'a>,
1936    {
1937        let tick = self.location.tick();
1938
1939        let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
1940            q!(|| None),
1941            q!(
1942                |latest, _| {
1943                    *latest = Some(Instant::now());
1944                },
1945                commutative = manual_proof!(/** TODO */)
1946            ),
1947        );
1948
1949        latest_received
1950            .snapshot(&tick, nondet)
1951            .filter_map(q!(move |latest_received| {
1952                if let Some(latest_received) = latest_received {
1953                    if Instant::now().duration_since(latest_received) > duration {
1954                        Some(())
1955                    } else {
1956                        None
1957                    }
1958                } else {
1959                    Some(())
1960                }
1961            }))
1962            .latest()
1963    }
1964
1965    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
1966    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
1967    ///
1968    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1969    /// processed before an acknowledgement is emitted.
1970    pub fn atomic(self) -> Stream<T, Atomic<L>, B, O, R> {
1971        let id = self.location.flow_state().borrow_mut().next_clock_id();
1972        let out_location = Atomic {
1973            tick: Tick {
1974                id,
1975                l: self.location.clone(),
1976            },
1977        };
1978        Stream::new(
1979            out_location.clone(),
1980            HydroNode::BeginAtomic {
1981                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1982                metadata: out_location
1983                    .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
1984            },
1985        )
1986    }
1987
1988    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1989    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1990    /// the order of the input. The output stream will execute in the [`Tick`] that was
1991    /// used to create the atomic section.
1992    ///
1993    /// # Non-Determinism
1994    /// The batch boundaries are non-deterministic and may change across executions.
1995    pub fn batch<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1996        self,
1997        tick: &Tick<L2>,
1998        _nondet: NonDet,
1999    ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
2000        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2001        Stream::new(
2002            tick.drop_consistency(),
2003            HydroNode::Batch {
2004                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2005                metadata: tick
2006                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2007            },
2008        )
2009    }
2010
2011    /// An operator which allows you to "name" a `HydroNode`.
2012    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
2013    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
2014        {
2015            let mut node = self.ir_node.borrow_mut();
2016            let metadata = node.metadata_mut();
2017            metadata.tag = Some(name.to_owned());
2018        }
2019        self
2020    }
2021
2022    /// Turns this [`Stream`] into a [`Optional`], under the invariant assumption that there is at
2023    /// most one element. If this invariant is broken, the program may exhibit undefined behavior,
2024    /// so uses must be carefully vetted.
2025    pub(crate) fn cast_at_most_one_element(self) -> Optional<T, L, B>
2026    where
2027        B: IsBounded,
2028    {
2029        Optional::new(
2030            self.location.clone(),
2031            HydroNode::Cast {
2032                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2033                metadata: self
2034                    .location
2035                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
2036            },
2037        )
2038    }
2039
2040    pub(crate) fn use_ordering_type<O2: Ordering>(self) -> Stream<T, L, B, O2, R> {
2041        if O::ORDERING_KIND == O2::ORDERING_KIND {
2042            Stream::new(
2043                self.location.clone(),
2044                self.ir_node.replace(HydroNode::Placeholder),
2045            )
2046        } else {
2047            panic!(
2048                "Runtime ordering {:?} did not match requested cast {:?}.",
2049                O::ORDERING_KIND,
2050                O2::ORDERING_KIND
2051            )
2052        }
2053    }
2054
2055    /// Explicitly "casts" the stream to a type with a different ordering
2056    /// guarantee. Useful in unsafe code where the ordering cannot be proven
2057    /// by the type-system.
2058    ///
2059    /// # Non-Determinism
2060    /// This function is used as an escape hatch, and any mistakes in the
2061    /// provided ordering guarantee will propagate into the guarantees
2062    /// for the rest of the program.
2063    pub fn assume_ordering<O2: Ordering>(
2064        self,
2065        _nondet: NonDet,
2066    ) -> Stream<T, L::DropConsistency, B, O2, R> {
2067        if O::ORDERING_KIND == O2::ORDERING_KIND {
2068            self.use_ordering_type().weaken_consistency()
2069        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2070            // We can always weaken the ordering guarantee
2071            let target_location = self.location().drop_consistency();
2072            Stream::new(
2073                target_location.clone(),
2074                HydroNode::Cast {
2075                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2076                    metadata: target_location
2077                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2078                },
2079            )
2080        } else {
2081            let target_location = self.location().drop_consistency();
2082            Stream::new(
2083                target_location.clone(),
2084                HydroNode::ObserveNonDet {
2085                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2086                    trusted: false,
2087                    metadata: target_location
2088                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2089                },
2090            )
2091        }
2092    }
2093
2094    // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
2095    // intermediate states will not be revealed
2096    fn assume_ordering_trusted_bounded<O2: Ordering>(
2097        self,
2098        nondet: NonDet,
2099    ) -> Stream<T, L, B, O2, R> {
2100        if B::BOUNDED {
2101            self.assume_ordering_trusted(nondet)
2102        } else {
2103            let self_location = self.location.clone();
2104            let inner: Stream<T, L::DropConsistency, B, O2, R> = self.assume_ordering(nondet);
2105            Stream::new(self_location, inner.ir_node.replace(HydroNode::Placeholder))
2106        }
2107    }
2108
2109    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
2110    // is not observable
2111    pub(crate) fn assume_ordering_trusted<O2: Ordering>(
2112        self,
2113        _nondet: NonDet,
2114    ) -> Stream<T, L, B, O2, R> {
2115        if O::ORDERING_KIND == O2::ORDERING_KIND {
2116            self.use_ordering_type()
2117        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2118            // We can always weaken the ordering guarantee
2119            Stream::new(
2120                self.location.clone(),
2121                HydroNode::Cast {
2122                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2123                    metadata: self
2124                        .location
2125                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2126                },
2127            )
2128        } else {
2129            Stream::new(
2130                self.location.clone(),
2131                HydroNode::ObserveNonDet {
2132                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2133                    trusted: true,
2134                    metadata: self
2135                        .location
2136                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2137                },
2138            )
2139        }
2140    }
2141
2142    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
2143    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
2144    /// which is always safe because that is the weakest possible guarantee.
2145    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
2146        self.weaken_ordering::<NoOrder>()
2147    }
2148
2149    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
2150    /// enforcing that `O2` is weaker than the input ordering guarantee.
2151    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
2152        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
2153        self.assume_ordering_trusted::<O2>(nondet)
2154    }
2155
2156    /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
2157    /// implies that `O == TotalOrder`.
2158    pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
2159    where
2160        O: IsOrdered,
2161    {
2162        self.assume_ordering_trusted(nondet!(/** no-op */))
2163    }
2164
2165    /// Explicitly "casts" the stream to a type with a different retries
2166    /// guarantee. Useful in unsafe code where the lack of retries cannot
2167    /// be proven by the type-system.
2168    ///
2169    /// # Non-Determinism
2170    /// This function is used as an escape hatch, and any mistakes in the
2171    /// provided retries guarantee will propagate into the guarantees
2172    /// for the rest of the program.
2173    pub fn assume_retries<R2: Retries>(
2174        self,
2175        _nondet: NonDet,
2176    ) -> Stream<T, L::DropConsistency, B, O, R2> {
2177        if R::RETRIES_KIND == R2::RETRIES_KIND {
2178            Stream::new(
2179                self.location.drop_consistency(),
2180                self.ir_node.replace(HydroNode::Placeholder),
2181            )
2182        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2183            // We can always weaken the retries guarantee
2184            let target_location = self.location.drop_consistency();
2185            Stream::new(
2186                target_location.clone(),
2187                HydroNode::Cast {
2188                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2189                    metadata: target_location
2190                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2191                },
2192            )
2193        } else {
2194            let target_location = self.location.drop_consistency();
2195            Stream::new(
2196                target_location.clone(),
2197                HydroNode::ObserveNonDet {
2198                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2199                    trusted: false,
2200                    metadata: target_location
2201                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2202                },
2203            )
2204        }
2205    }
2206
2207    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
2208    // is not observable
2209    fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
2210        if R::RETRIES_KIND == R2::RETRIES_KIND {
2211            Stream::new(
2212                self.location.clone(),
2213                self.ir_node.replace(HydroNode::Placeholder),
2214            )
2215        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2216            // We can always weaken the retries guarantee
2217            Stream::new(
2218                self.location.clone(),
2219                HydroNode::Cast {
2220                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2221                    metadata: self
2222                        .location
2223                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2224                },
2225            )
2226        } else {
2227            Stream::new(
2228                self.location.clone(),
2229                HydroNode::ObserveNonDet {
2230                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2231                    trusted: true,
2232                    metadata: self
2233                        .location
2234                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2235                },
2236            )
2237        }
2238    }
2239
2240    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
2241    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
2242    /// which is always safe because that is the weakest possible guarantee.
2243    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
2244        self.weaken_retries::<AtLeastOnce>()
2245    }
2246
2247    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
2248    /// enforcing that `R2` is weaker than the input retries guarantee.
2249    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
2250        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
2251        self.assume_retries_trusted::<R2>(nondet)
2252    }
2253
2254    /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
2255    /// implies that `R == ExactlyOnce`.
2256    pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
2257    where
2258        R: IsExactlyOnce,
2259    {
2260        self.assume_retries_trusted(nondet!(/** no-op */))
2261    }
2262
2263    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
2264    /// implies that `B == Bounded`.
2265    pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
2266    where
2267        B: IsBounded,
2268    {
2269        self.weaken_boundedness()
2270    }
2271
2272    /// Weakens the boundedness guarantee to an arbitrary boundedness `B2`, given that `B: IsBounded`,
2273    /// which implies that `B == Bounded`.
2274    pub fn weaken_boundedness<B2: Boundedness>(self) -> Stream<T, L, B2, O, R> {
2275        if B::BOUNDED == B2::BOUNDED {
2276            Stream::new(
2277                self.location.clone(),
2278                self.ir_node.replace(HydroNode::Placeholder),
2279            )
2280        } else {
2281            // We can always weaken the boundedness
2282            Stream::new(
2283                self.location.clone(),
2284                HydroNode::Cast {
2285                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2286                    metadata: self
2287                        .location
2288                        .new_node_metadata(Stream::<T, L, B2, O, R>::collection_kind()),
2289                },
2290            )
2291        }
2292    }
2293}
2294
2295impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
2296where
2297    L: Location<'a>,
2298{
2299    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
2300    ///
2301    /// # Example
2302    /// ```rust
2303    /// # #[cfg(feature = "deploy")] {
2304    /// # use hydro_lang::prelude::*;
2305    /// # use futures::StreamExt;
2306    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2307    /// process.source_iter(q!(&[1, 2, 3])).cloned()
2308    /// # }, |mut stream| async move {
2309    /// // 1, 2, 3
2310    /// # for w in vec![1, 2, 3] {
2311    /// #     assert_eq!(stream.next().await.unwrap(), w);
2312    /// # }
2313    /// # }));
2314    /// # }
2315    /// ```
2316    pub fn cloned(self) -> Stream<T, L, B, O, R>
2317    where
2318        T: Clone,
2319    {
2320        self.map(q!(|d| d.clone()))
2321    }
2322}
2323
2324impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2325where
2326    L: Location<'a>,
2327{
2328    /// Computes the number of elements in the stream as a [`Singleton`].
2329    ///
2330    /// # Example
2331    /// ```rust
2332    /// # #[cfg(feature = "deploy")] {
2333    /// # use hydro_lang::prelude::*;
2334    /// # use futures::StreamExt;
2335    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2336    /// let tick = process.tick();
2337    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2338    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2339    /// batch.count().all_ticks()
2340    /// # }, |mut stream| async move {
2341    /// // 4
2342    /// # assert_eq!(stream.next().await.unwrap(), 4);
2343    /// # }));
2344    /// # }
2345    /// ```
2346    pub fn count(self) -> Singleton<usize, L, B::StreamToMonotone> {
2347        self.assume_ordering_trusted::<TotalOrder>(nondet!(
2348            /// Order does not affect eventual count, and also does not affect intermediate states.
2349        ))
2350        .fold(
2351            q!(|| 0usize),
2352            q!(
2353                |count, _| *count += 1,
2354                monotone = manual_proof!(/** += 1 is monotone */)
2355            ),
2356        )
2357    }
2358}
2359
2360impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2361    /// Produces a new stream that merges the elements of the two input streams.
2362    /// The result has [`NoOrder`] because the order of merging is not guaranteed.
2363    ///
2364    /// Currently, both input streams must be [`Unbounded`]. When the streams are
2365    /// [`Bounded`], you can use [`Stream::chain`] instead.
2366    ///
2367    /// # Example
2368    /// ```rust
2369    /// # #[cfg(feature = "deploy")] {
2370    /// # use hydro_lang::prelude::*;
2371    /// # use futures::StreamExt;
2372    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2373    /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
2374    /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
2375    /// numbers.clone().map(q!(|x| x + 1)).merge_unordered(numbers)
2376    /// # }, |mut stream| async move {
2377    /// // 2, 3, 4, 5, and 1, 2, 3, 4 merged in unknown order
2378    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2379    /// #     assert_eq!(stream.next().await.unwrap(), w);
2380    /// # }
2381    /// # }));
2382    /// # }
2383    /// ```
2384    pub fn merge_unordered<O2: Ordering, R2: Retries>(
2385        self,
2386        other: Stream<T, L, Unbounded, O2, R2>,
2387    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2388    where
2389        R: MinRetries<R2>,
2390    {
2391        Stream::new(
2392            self.location.clone(),
2393            HydroNode::Chain {
2394                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2395                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2396                metadata: self.location.new_node_metadata(Stream::<
2397                    T,
2398                    L,
2399                    Unbounded,
2400                    NoOrder,
2401                    <R as MinRetries<R2>>::Min,
2402                >::collection_kind()),
2403            },
2404        )
2405    }
2406
2407    /// Deprecated: use [`Stream::merge_unordered`] instead.
2408    #[deprecated(note = "use `merge_unordered` instead")]
2409    pub fn interleave<O2: Ordering, R2: Retries>(
2410        self,
2411        other: Stream<T, L, Unbounded, O2, R2>,
2412    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2413    where
2414        R: MinRetries<R2>,
2415    {
2416        self.merge_unordered(other)
2417    }
2418}
2419
2420impl<'a, T, L: Location<'a>, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R> {
2421    /// Produces a new stream that combines the elements of the two input streams,
2422    /// preserving the relative order of elements within each input.
2423    ///
2424    /// # Non-Determinism
2425    /// The order in which elements *across* the two streams will be interleaved is
2426    /// non-deterministic, so the order of elements will vary across runs. If the output
2427    /// order is irrelevant, use [`Stream::merge_unordered`] instead, which is deterministic
2428    /// but emits an unordered stream. For deterministic first-then-second ordering on
2429    /// bounded streams, use [`Stream::chain`].
2430    ///
2431    /// # Example
2432    /// ```rust
2433    /// # #[cfg(feature = "deploy")] {
2434    /// # use hydro_lang::prelude::*;
2435    /// # use futures::StreamExt;
2436    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2437    /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
2438    /// # process.source_iter(q!(vec![1, 3])).into();
2439    /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
2440    /// # }, |mut stream| async move {
2441    /// // 1, 3 and 2, 4 in some order, preserving the original local order
2442    /// # for w in vec![1, 3, 2, 4] {
2443    /// #     assert_eq!(stream.next().await.unwrap(), w);
2444    /// # }
2445    /// # }));
2446    /// # }
2447    /// ```
2448    pub fn merge_ordered<R2: Retries>(
2449        self,
2450        other: Stream<T, L, B, TotalOrder, R2>,
2451        _nondet: NonDet,
2452    ) -> Stream<T, L::DropConsistency, B, TotalOrder, <R as MinRetries<R2>>::Min>
2453    where
2454        R: MinRetries<R2>,
2455    {
2456        let target_location = self.location().drop_consistency();
2457        Stream::new(
2458            target_location.clone(),
2459            HydroNode::MergeOrdered {
2460                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2461                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2462                metadata: target_location.new_node_metadata(Stream::<
2463                    T,
2464                    L::DropConsistency,
2465                    B,
2466                    TotalOrder,
2467                    <R as MinRetries<R2>>::Min,
2468                >::collection_kind()),
2469            },
2470        )
2471    }
2472}
2473
2474impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2475where
2476    L: Location<'a>,
2477{
2478    /// Produces a new stream that emits the input elements in sorted order.
2479    ///
2480    /// The input stream can have any ordering guarantee, but the output stream
2481    /// will have a [`TotalOrder`] guarantee. This operator will block until all
2482    /// elements in the input stream are available, so it requires the input stream
2483    /// to be [`Bounded`].
2484    ///
2485    /// # Example
2486    /// ```rust
2487    /// # #[cfg(feature = "deploy")] {
2488    /// # use hydro_lang::prelude::*;
2489    /// # use futures::StreamExt;
2490    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2491    /// let tick = process.tick();
2492    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
2493    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2494    /// batch.sort().all_ticks()
2495    /// # }, |mut stream| async move {
2496    /// // 1, 2, 3, 4
2497    /// # for w in (1..5) {
2498    /// #     assert_eq!(stream.next().await.unwrap(), w);
2499    /// # }
2500    /// # }));
2501    /// # }
2502    /// ```
2503    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2504    where
2505        B: IsBounded,
2506        T: Ord,
2507    {
2508        let this = self.make_bounded();
2509        Stream::new(
2510            this.location.clone(),
2511            HydroNode::Sort {
2512                input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2513                metadata: this
2514                    .location
2515                    .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2516            },
2517        )
2518    }
2519
2520    /// Produces a new stream that first emits the elements of the `self` stream,
2521    /// and then emits the elements of the `other` stream. The output stream has
2522    /// a [`TotalOrder`] guarantee if and only if both input streams have a
2523    /// [`TotalOrder`] guarantee.
2524    ///
2525    /// Currently, both input streams must be [`Bounded`]. This operator will block
2526    /// on the first stream until all its elements are available. In a future version,
2527    /// we will relax the requirement on the `other` stream.
2528    ///
2529    /// # Example
2530    /// ```rust
2531    /// # #[cfg(feature = "deploy")] {
2532    /// # use hydro_lang::prelude::*;
2533    /// # use futures::StreamExt;
2534    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2535    /// let tick = process.tick();
2536    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2537    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2538    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2539    /// # }, |mut stream| async move {
2540    /// // 2, 3, 4, 5, 1, 2, 3, 4
2541    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2542    /// #     assert_eq!(stream.next().await.unwrap(), w);
2543    /// # }
2544    /// # }));
2545    /// # }
2546    /// ```
2547    pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2548        self,
2549        other: Stream<T, L, B2, O2, R2>,
2550    ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2551    where
2552        B: IsBounded,
2553        O: MinOrder<O2>,
2554        R: MinRetries<R2>,
2555    {
2556        check_matching_location(&self.location, &other.location);
2557
2558        Stream::new(
2559            self.location.clone(),
2560            HydroNode::Chain {
2561                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2562                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2563                metadata: self.location.new_node_metadata(Stream::<
2564                    T,
2565                    L,
2566                    B2,
2567                    <O as MinOrder<O2>>::Min,
2568                    <R as MinRetries<R2>>::Min,
2569                >::collection_kind()),
2570            },
2571        )
2572    }
2573
2574    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
2575    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
2576    /// because this is compiled into a nested loop.
2577    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
2578        self,
2579        other: Stream<T2, L, Bounded, O2, R>,
2580    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
2581    where
2582        B: IsBounded,
2583        T: Clone,
2584        T2: Clone,
2585    {
2586        let this = self.make_bounded();
2587        check_matching_location(&this.location, &other.location);
2588
2589        Stream::new(
2590            this.location.clone(),
2591            HydroNode::CrossProduct {
2592                left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2593                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2594                metadata: this.location.new_node_metadata(Stream::<
2595                    (T, T2),
2596                    L,
2597                    Bounded,
2598                    <O2 as MinOrder<O>>::Min,
2599                    R,
2600                >::collection_kind()),
2601            },
2602        )
2603    }
2604
2605    /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2606    /// `self` used as the values for *each* key.
2607    ///
2608    /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2609    /// values. For example, it can be used to send the same set of elements to several cluster
2610    /// members, if the membership information is available as a [`KeyedSingleton`].
2611    ///
2612    /// # Example
2613    /// ```rust
2614    /// # #[cfg(feature = "deploy")] {
2615    /// # use hydro_lang::prelude::*;
2616    /// # use futures::StreamExt;
2617    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2618    /// # let tick = process.tick();
2619    /// let keyed_singleton = // { 1: (), 2: () }
2620    /// # process
2621    /// #     .source_iter(q!(vec![(1, ()), (2, ())]))
2622    /// #     .into_keyed()
2623    /// #     .batch(&tick, nondet!(/** test */))
2624    /// #     .first();
2625    /// let stream = // [ "a", "b" ]
2626    /// # process
2627    /// #     .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2628    /// #     .batch(&tick, nondet!(/** test */));
2629    /// stream.repeat_with_keys(keyed_singleton)
2630    /// # .entries().all_ticks()
2631    /// # }, |mut stream| async move {
2632    /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2633    /// # let mut results = Vec::new();
2634    /// # for _ in 0..4 {
2635    /// #     results.push(stream.next().await.unwrap());
2636    /// # }
2637    /// # results.sort();
2638    /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2639    /// # }));
2640    /// # }
2641    /// ```
2642    pub fn repeat_with_keys<K, V2>(
2643        self,
2644        keys: KeyedSingleton<K, V2, L, Bounded>,
2645    ) -> KeyedStream<K, T, L, Bounded, O, R>
2646    where
2647        B: IsBounded,
2648        K: Clone,
2649        T: Clone,
2650    {
2651        keys.keys()
2652            .weaken_retries()
2653            .assume_ordering_trusted::<TotalOrder>(
2654                nondet!(/** keyed stream does not depend on ordering of keys */),
2655            )
2656            .cross_product_nested_loop(self.make_bounded())
2657            .into_keyed()
2658    }
2659
2660    /// Consumes a stream of `Future<T>`, resolving each future while blocking subgraph
2661    /// execution until all results are available. The output order is based on when futures
2662    /// complete, and may be different than the input order.
2663    ///
2664    /// Unlike [`Stream::resolve_futures`], which allows the subgraph to continue executing
2665    /// while futures are pending, this variant blocks until the futures resolve.
2666    ///
2667    /// # Example
2668    /// ```rust
2669    /// # #[cfg(feature = "deploy")] {
2670    /// # use std::collections::HashSet;
2671    /// # use futures::StreamExt;
2672    /// # use hydro_lang::prelude::*;
2673    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2674    /// process
2675    ///     .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2676    ///     .map(q!(|x| async move {
2677    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2678    ///         x
2679    ///     }))
2680    ///     .resolve_futures_blocking()
2681    /// #   },
2682    /// #   |mut stream| async move {
2683    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2684    /// #       let mut output = HashSet::new();
2685    /// #       for _ in 1..10 {
2686    /// #           output.insert(stream.next().await.unwrap());
2687    /// #       }
2688    /// #       assert_eq!(
2689    /// #           output,
2690    /// #           HashSet::<i32>::from_iter(1..10)
2691    /// #       );
2692    /// #   },
2693    /// # ));
2694    /// # }
2695    /// ```
2696    pub fn resolve_futures_blocking(self) -> Stream<T::Output, L, B, NoOrder, R>
2697    where
2698        T: Future,
2699    {
2700        Stream::new(
2701            self.location.clone(),
2702            HydroNode::ResolveFuturesBlocking {
2703                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2704                metadata: self
2705                    .location
2706                    .new_node_metadata(Stream::<T::Output, L, B, NoOrder, R>::collection_kind()),
2707            },
2708        )
2709    }
2710
2711    /// Returns a [`Singleton`] containing `true` if the stream has no elements, or `false` otherwise.
2712    ///
2713    /// # Example
2714    /// ```rust
2715    /// # #[cfg(feature = "deploy")] {
2716    /// # use hydro_lang::prelude::*;
2717    /// # use futures::StreamExt;
2718    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2719    /// let tick = process.tick();
2720    /// let empty: Stream<i32, _, Bounded> = process
2721    ///   .source_iter(q!(Vec::<i32>::new()))
2722    ///   .batch(&tick, nondet!(/** test */));
2723    /// empty.is_empty().all_ticks()
2724    /// # }, |mut stream| async move {
2725    /// // true
2726    /// # assert_eq!(stream.next().await.unwrap(), true);
2727    /// # }));
2728    /// # }
2729    /// ```
2730    #[expect(clippy::wrong_self_convention, reason = "stream function naming")]
2731    pub fn is_empty(self) -> Singleton<bool, L, Bounded>
2732    where
2733        B: IsBounded,
2734    {
2735        self.make_bounded()
2736            .assume_ordering_trusted::<TotalOrder>(
2737                nondet!(/** is_empty intermediates unaffected by order */),
2738            )
2739            .first()
2740            .is_none()
2741    }
2742}
2743
2744impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2745where
2746    L: Location<'a>,
2747{
2748    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2749    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2750    /// by equi-joining the two streams on the key attribute `K`.
2751    ///
2752    /// When the right-hand side is [`Bounded`], the join accumulates the right side first
2753    /// and streams the left side through, preserving the left side's ordering. When both
2754    /// sides are [`Unbounded`], a symmetric hash join is used and ordering is [`NoOrder`].
2755    ///
2756    /// # Example
2757    /// ```rust
2758    /// # #[cfg(feature = "deploy")] {
2759    /// # use hydro_lang::prelude::*;
2760    /// # use std::collections::HashSet;
2761    /// # use futures::StreamExt;
2762    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2763    /// let tick = process.tick();
2764    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2765    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2766    /// stream1.join(stream2)
2767    /// # }, |mut stream| async move {
2768    /// // (1, ('a', 'x')), (2, ('b', 'y'))
2769    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2770    /// # stream.map(|i| assert!(expected.contains(&i)));
2771    /// # }));
2772    /// # }
2773    pub fn join<V2, B2: Boundedness, O2: Ordering, R2: Retries>(
2774        self,
2775        n: Stream<(K, V2), L, B2, O2, R2>,
2776    ) -> Stream<(K, (V1, V2)), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
2777    where
2778        K: Eq + Hash + Clone,
2779        R: MinRetries<R2>,
2780        V1: Clone,
2781        V2: Clone,
2782    {
2783        check_matching_location(&self.location, &n.location);
2784
2785        let ir_node = if B2::BOUNDED {
2786            HydroNode::JoinHalf {
2787                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2788                right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2789                metadata: self.location.new_node_metadata(Stream::<
2790                    (K, (V1, V2)),
2791                    L,
2792                    B,
2793                    B2::PreserveOrderIfBounded<O>,
2794                    <R as MinRetries<R2>>::Min,
2795                >::collection_kind()),
2796            }
2797        } else {
2798            HydroNode::Join {
2799                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2800                right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2801                metadata: self.location.new_node_metadata(Stream::<
2802                    (K, (V1, V2)),
2803                    L,
2804                    B,
2805                    B2::PreserveOrderIfBounded<O>,
2806                    <R as MinRetries<R2>>::Min,
2807                >::collection_kind()),
2808            }
2809        };
2810
2811        Stream::new(self.location.clone(), ir_node)
2812    }
2813
2814    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2815    /// computes the anti-join of the items in the input -- i.e. returns
2816    /// unique items in the first input that do not have a matching key
2817    /// in the second input.
2818    ///
2819    /// # Example
2820    /// ```rust
2821    /// # #[cfg(feature = "deploy")] {
2822    /// # use hydro_lang::prelude::*;
2823    /// # use futures::StreamExt;
2824    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2825    /// let tick = process.tick();
2826    /// let stream = process
2827    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2828    ///   .batch(&tick, nondet!(/** test */));
2829    /// let batch = process
2830    ///   .source_iter(q!(vec![1, 2]))
2831    ///   .batch(&tick, nondet!(/** test */));
2832    /// stream.anti_join(batch).all_ticks()
2833    /// # }, |mut stream| async move {
2834    /// # for w in vec![(3, 'c'), (4, 'd')] {
2835    /// #     assert_eq!(stream.next().await.unwrap(), w);
2836    /// # }
2837    /// # }));
2838    /// # }
2839    pub fn anti_join<O2: Ordering, R2: Retries>(
2840        self,
2841        n: Stream<K, L, Bounded, O2, R2>,
2842    ) -> Stream<(K, V1), L, B, O, R>
2843    where
2844        K: Eq + Hash,
2845    {
2846        check_matching_location(&self.location, &n.location);
2847
2848        Stream::new(
2849            self.location.clone(),
2850            HydroNode::AntiJoin {
2851                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2852                neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2853                metadata: self
2854                    .location
2855                    .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2856            },
2857        )
2858    }
2859}
2860
2861impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2862    Stream<(K, V), L, B, O, R>
2863{
2864    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2865    /// is used as the key and the second element is added to the entries associated with that key.
2866    ///
2867    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2868    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2869    /// performing grouped aggregations, but also for more precise ordering guarantees such as
2870    /// total ordering _within_ each group but no ordering _across_ groups.
2871    ///
2872    /// # Example
2873    /// ```rust
2874    /// # #[cfg(feature = "deploy")] {
2875    /// # use hydro_lang::prelude::*;
2876    /// # use futures::StreamExt;
2877    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2878    /// process
2879    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2880    ///     .into_keyed()
2881    /// #   .entries()
2882    /// # }, |mut stream| async move {
2883    /// // { 1: [2, 3], 2: [4] }
2884    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2885    /// #     assert_eq!(stream.next().await.unwrap(), w);
2886    /// # }
2887    /// # }));
2888    /// # }
2889    /// ```
2890    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2891        KeyedStream::new(
2892            self.location.clone(),
2893            HydroNode::Cast {
2894                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2895                metadata: self
2896                    .location
2897                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2898            },
2899        )
2900    }
2901}
2902
2903impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2904where
2905    K: Eq + Hash,
2906    L: Location<'a>,
2907{
2908    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2909    /// # Example
2910    /// ```rust
2911    /// # #[cfg(feature = "deploy")] {
2912    /// # use hydro_lang::prelude::*;
2913    /// # use futures::StreamExt;
2914    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2915    /// let tick = process.tick();
2916    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2917    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2918    /// batch.keys().all_ticks()
2919    /// # }, |mut stream| async move {
2920    /// // 1, 2
2921    /// # assert_eq!(stream.next().await.unwrap(), 1);
2922    /// # assert_eq!(stream.next().await.unwrap(), 2);
2923    /// # }));
2924    /// # }
2925    /// ```
2926    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2927        self.into_keyed()
2928            .fold(
2929                q!(|| ()),
2930                q!(
2931                    |_, _| {},
2932                    commutative = manual_proof!(/** values are ignored */),
2933                    idempotent = manual_proof!(/** values are ignored */)
2934                ),
2935            )
2936            .keys()
2937    }
2938}
2939
2940impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2941where
2942    L: Location<'a>,
2943{
2944    /// Returns a stream corresponding to the latest batch of elements being atomically
2945    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2946    /// the order of the input.
2947    ///
2948    /// # Non-Determinism
2949    /// The batch boundaries are non-deterministic and may change across executions.
2950    pub fn batch_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
2951        self,
2952        tick: &Tick<L2>,
2953        _nondet: NonDet,
2954    ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
2955        Stream::new(
2956            tick.drop_consistency(),
2957            HydroNode::Batch {
2958                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2959                metadata: tick
2960                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2961            },
2962        )
2963    }
2964
2965    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2966    /// See [`Stream::atomic`] for more details.
2967    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2968        Stream::new(
2969            self.location.tick.l.clone(),
2970            HydroNode::EndAtomic {
2971                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2972                metadata: self
2973                    .location
2974                    .tick
2975                    .l
2976                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2977            },
2978        )
2979    }
2980}
2981
2982impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2983where
2984    L: TopLevel<'a>,
2985    F: Future<Output = T>,
2986{
2987    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2988    /// Future outputs are produced as available, regardless of input arrival order.
2989    ///
2990    /// # Example
2991    /// ```rust
2992    /// # #[cfg(feature = "deploy")] {
2993    /// # use std::collections::HashSet;
2994    /// # use futures::StreamExt;
2995    /// # use hydro_lang::prelude::*;
2996    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2997    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2998    ///     .map(q!(|x| async move {
2999    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
3000    ///         x
3001    ///     }))
3002    ///     .resolve_futures()
3003    /// #   },
3004    /// #   |mut stream| async move {
3005    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
3006    /// #       let mut output = HashSet::new();
3007    /// #       for _ in 1..10 {
3008    /// #           output.insert(stream.next().await.unwrap());
3009    /// #       }
3010    /// #       assert_eq!(
3011    /// #           output,
3012    /// #           HashSet::<i32>::from_iter(1..10)
3013    /// #       );
3014    /// #   },
3015    /// # ));
3016    /// # }
3017    pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
3018        Stream::new(
3019            self.location.clone(),
3020            HydroNode::ResolveFutures {
3021                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3022                metadata: self
3023                    .location
3024                    .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
3025            },
3026        )
3027    }
3028
3029    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
3030    /// Future outputs are produced in the same order as the input stream.
3031    ///
3032    /// # Example
3033    /// ```rust
3034    /// # #[cfg(feature = "deploy")] {
3035    /// # use std::collections::HashSet;
3036    /// # use futures::StreamExt;
3037    /// # use hydro_lang::prelude::*;
3038    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3039    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
3040    ///     .map(q!(|x| async move {
3041    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
3042    ///         x
3043    ///     }))
3044    ///     .resolve_futures_ordered()
3045    /// #   },
3046    /// #   |mut stream| async move {
3047    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
3048    /// #       let mut output = Vec::new();
3049    /// #       for _ in 1..10 {
3050    /// #           output.push(stream.next().await.unwrap());
3051    /// #       }
3052    /// #       assert_eq!(
3053    /// #           output,
3054    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
3055    /// #       );
3056    /// #   },
3057    /// # ));
3058    /// # }
3059    pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
3060        Stream::new(
3061            self.location.clone(),
3062            HydroNode::ResolveFuturesOrdered {
3063                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3064                metadata: self
3065                    .location
3066                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3067            },
3068        )
3069    }
3070}
3071
3072impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
3073where
3074    L: Location<'a>,
3075{
3076    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
3077    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
3078    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
3079        Stream::new(
3080            self.location.outer().clone(),
3081            HydroNode::YieldConcat {
3082                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3083                metadata: self
3084                    .location
3085                    .outer()
3086                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3087            },
3088        )
3089    }
3090
3091    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
3092    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
3093    ///
3094    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
3095    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
3096    /// stream's [`Tick`] context.
3097    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
3098        let out_location = Atomic {
3099            tick: self.location.clone(),
3100        };
3101
3102        Stream::new(
3103            out_location.clone(),
3104            HydroNode::YieldConcat {
3105                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3106                metadata: out_location
3107                    .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
3108            },
3109        )
3110    }
3111
3112    /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
3113    /// such as `fold` retrain their memory across ticks rather than resetting across batches of
3114    /// input.
3115    ///
3116    /// This API is particularly useful for stateful computation on batches of data, such as
3117    /// maintaining an accumulated state that is up to date with the current batch.
3118    ///
3119    /// # Example
3120    /// ```rust
3121    /// # #[cfg(feature = "deploy")] {
3122    /// # use hydro_lang::prelude::*;
3123    /// # use futures::StreamExt;
3124    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3125    /// let tick = process.tick();
3126    /// # // ticks are lazy by default, forces the second tick to run
3127    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3128    /// # let batch_first_tick = process
3129    /// #   .source_iter(q!(vec![1, 2, 3, 4]))
3130    /// #  .batch(&tick, nondet!(/** test */));
3131    /// # let batch_second_tick = process
3132    /// #   .source_iter(q!(vec![5, 6, 7]))
3133    /// #   .batch(&tick, nondet!(/** test */))
3134    /// #   .defer_tick(); // appears on the second tick
3135    /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
3136    /// # batch_first_tick.chain(batch_second_tick).all_ticks();
3137    ///
3138    /// input.batch(&tick, nondet!(/** test */))
3139    ///     .across_ticks(|s| s.count()).all_ticks()
3140    /// # }, |mut stream| async move {
3141    /// // [4, 7]
3142    /// assert_eq!(stream.next().await.unwrap(), 4);
3143    /// assert_eq!(stream.next().await.unwrap(), 7);
3144    /// # }));
3145    /// # }
3146    /// ```
3147    pub fn across_ticks<Out: BatchAtomic<'a>>(
3148        self,
3149        thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
3150    ) -> Out::Batched {
3151        thunk(self.all_ticks_atomic()).batched_atomic()
3152    }
3153
3154    /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
3155    /// always has the elements of `self` at tick `T - 1`.
3156    ///
3157    /// At tick `0`, the output stream is empty, since there is no previous tick.
3158    ///
3159    /// This operator enables stateful iterative processing with ticks, by sending data from one
3160    /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
3161    ///
3162    /// # Example
3163    /// ```rust
3164    /// # #[cfg(feature = "deploy")] {
3165    /// # use hydro_lang::prelude::*;
3166    /// # use futures::StreamExt;
3167    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3168    /// let tick = process.tick();
3169    /// // ticks are lazy by default, forces the second tick to run
3170    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3171    ///
3172    /// let batch_first_tick = process
3173    ///   .source_iter(q!(vec![1, 2, 3, 4]))
3174    ///   .batch(&tick, nondet!(/** test */));
3175    /// let batch_second_tick = process
3176    ///   .source_iter(q!(vec![0, 3, 4, 5, 6]))
3177    ///   .batch(&tick, nondet!(/** test */))
3178    ///   .defer_tick(); // appears on the second tick
3179    /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
3180    ///
3181    /// changes_across_ticks.clone().filter_not_in(
3182    ///     changes_across_ticks.defer_tick() // the elements from the previous tick
3183    /// ).all_ticks()
3184    /// # }, |mut stream| async move {
3185    /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
3186    /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
3187    /// #     assert_eq!(stream.next().await.unwrap(), w);
3188    /// # }
3189    /// # }));
3190    /// # }
3191    /// ```
3192    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
3193        Stream::new(
3194            self.location.clone(),
3195            HydroNode::DeferTick {
3196                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3197                metadata: self
3198                    .location
3199                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3200            },
3201        )
3202    }
3203}
3204
3205#[cfg(test)]
3206mod tests {
3207    #[cfg(feature = "deploy")]
3208    use futures::{SinkExt, StreamExt};
3209    #[cfg(feature = "deploy")]
3210    use hydro_deploy::Deployment;
3211    #[cfg(feature = "deploy")]
3212    use serde::{Deserialize, Serialize};
3213    #[cfg(any(feature = "deploy", feature = "sim"))]
3214    use stageleft::q;
3215
3216    #[cfg(any(feature = "deploy", feature = "sim"))]
3217    use crate::compile::builder::FlowBuilder;
3218    #[cfg(feature = "deploy")]
3219    use crate::live_collections::sliced::sliced;
3220    #[cfg(feature = "deploy")]
3221    use crate::live_collections::stream::ExactlyOnce;
3222    #[cfg(feature = "sim")]
3223    use crate::live_collections::stream::NoOrder;
3224    #[cfg(any(feature = "deploy", feature = "sim"))]
3225    use crate::live_collections::stream::TotalOrder;
3226    #[cfg(any(feature = "deploy", feature = "sim"))]
3227    use crate::location::Location;
3228    #[cfg(feature = "sim")]
3229    use crate::networking::TCP;
3230    #[cfg(any(feature = "deploy", feature = "sim"))]
3231    use crate::nondet::nondet;
3232
3233    mod backtrace_chained_ops;
3234
3235    #[cfg(feature = "deploy")]
3236    struct P1 {}
3237    #[cfg(feature = "deploy")]
3238    struct P2 {}
3239
3240    #[cfg(feature = "deploy")]
3241    #[derive(Serialize, Deserialize, Debug)]
3242    struct SendOverNetwork {
3243        n: u32,
3244    }
3245
3246    #[cfg(feature = "deploy")]
3247    #[tokio::test]
3248    async fn first_ten_distributed() {
3249        use crate::networking::TCP;
3250
3251        let mut deployment = Deployment::new();
3252
3253        let mut flow = FlowBuilder::new();
3254        let first_node = flow.process::<P1>();
3255        let second_node = flow.process::<P2>();
3256        let external = flow.external::<P2>();
3257
3258        let numbers = first_node.source_iter(q!(0..10));
3259        let out_port = numbers
3260            .map(q!(|n| SendOverNetwork { n }))
3261            .send(&second_node, TCP.fail_stop().bincode())
3262            .send_bincode_external(&external);
3263
3264        let nodes = flow
3265            .with_process(&first_node, deployment.Localhost())
3266            .with_process(&second_node, deployment.Localhost())
3267            .with_external(&external, deployment.Localhost())
3268            .deploy(&mut deployment);
3269
3270        deployment.deploy().await.unwrap();
3271
3272        let mut external_out = nodes.connect(out_port).await;
3273
3274        deployment.start().await.unwrap();
3275
3276        for i in 0..10 {
3277            assert_eq!(external_out.next().await.unwrap().n, i);
3278        }
3279    }
3280
3281    #[cfg(feature = "deploy")]
3282    #[tokio::test]
3283    async fn first_cardinality() {
3284        let mut deployment = Deployment::new();
3285
3286        let mut flow = FlowBuilder::new();
3287        let node = flow.process::<()>();
3288        let external = flow.external::<()>();
3289
3290        let node_tick = node.tick();
3291        let count = node_tick
3292            .singleton(q!([1, 2, 3]))
3293            .into_stream()
3294            .flatten_ordered()
3295            .first()
3296            .into_stream()
3297            .count()
3298            .all_ticks()
3299            .send_bincode_external(&external);
3300
3301        let nodes = flow
3302            .with_process(&node, deployment.Localhost())
3303            .with_external(&external, deployment.Localhost())
3304            .deploy(&mut deployment);
3305
3306        deployment.deploy().await.unwrap();
3307
3308        let mut external_out = nodes.connect(count).await;
3309
3310        deployment.start().await.unwrap();
3311
3312        assert_eq!(external_out.next().await.unwrap(), 1);
3313    }
3314
3315    #[cfg(feature = "deploy")]
3316    #[tokio::test]
3317    async fn unbounded_reduce_remembers_state() {
3318        let mut deployment = Deployment::new();
3319
3320        let mut flow = FlowBuilder::new();
3321        let node = flow.process::<()>();
3322        let external = flow.external::<()>();
3323
3324        let (input_port, input) = node.source_external_bincode(&external);
3325        let out = input
3326            .reduce(q!(|acc, v| *acc += v))
3327            .sample_eager(nondet!(/** test */))
3328            .send_bincode_external(&external);
3329
3330        let nodes = flow
3331            .with_process(&node, deployment.Localhost())
3332            .with_external(&external, deployment.Localhost())
3333            .deploy(&mut deployment);
3334
3335        deployment.deploy().await.unwrap();
3336
3337        let mut external_in = nodes.connect(input_port).await;
3338        let mut external_out = nodes.connect(out).await;
3339
3340        deployment.start().await.unwrap();
3341
3342        external_in.send(1).await.unwrap();
3343        assert_eq!(external_out.next().await.unwrap(), 1);
3344
3345        external_in.send(2).await.unwrap();
3346        assert_eq!(external_out.next().await.unwrap(), 3);
3347    }
3348
3349    #[cfg(feature = "deploy")]
3350    #[tokio::test]
3351    async fn top_level_bounded_cross_singleton() {
3352        let mut deployment = Deployment::new();
3353
3354        let mut flow = FlowBuilder::new();
3355        let node = flow.process::<()>();
3356        let external = flow.external::<()>();
3357
3358        let (input_port, input) =
3359            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3360
3361        let out = input
3362            .cross_singleton(
3363                node.source_iter(q!(vec![1, 2, 3]))
3364                    .fold(q!(|| 0), q!(|acc, v| *acc += v)),
3365            )
3366            .send_bincode_external(&external);
3367
3368        let nodes = flow
3369            .with_process(&node, deployment.Localhost())
3370            .with_external(&external, deployment.Localhost())
3371            .deploy(&mut deployment);
3372
3373        deployment.deploy().await.unwrap();
3374
3375        let mut external_in = nodes.connect(input_port).await;
3376        let mut external_out = nodes.connect(out).await;
3377
3378        deployment.start().await.unwrap();
3379
3380        external_in.send(1).await.unwrap();
3381        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3382
3383        external_in.send(2).await.unwrap();
3384        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3385    }
3386
3387    #[cfg(feature = "deploy")]
3388    #[tokio::test]
3389    async fn top_level_bounded_reduce_cardinality() {
3390        let mut deployment = Deployment::new();
3391
3392        let mut flow = FlowBuilder::new();
3393        let node = flow.process::<()>();
3394        let external = flow.external::<()>();
3395
3396        let (input_port, input) =
3397            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3398
3399        let out = sliced! {
3400            let input = use(input, nondet!(/** test */));
3401            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
3402            input.cross_singleton(v.into_stream().count())
3403        }
3404        .send_bincode_external(&external);
3405
3406        let nodes = flow
3407            .with_process(&node, deployment.Localhost())
3408            .with_external(&external, deployment.Localhost())
3409            .deploy(&mut deployment);
3410
3411        deployment.deploy().await.unwrap();
3412
3413        let mut external_in = nodes.connect(input_port).await;
3414        let mut external_out = nodes.connect(out).await;
3415
3416        deployment.start().await.unwrap();
3417
3418        external_in.send(1).await.unwrap();
3419        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3420
3421        external_in.send(2).await.unwrap();
3422        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3423    }
3424
3425    #[cfg(feature = "deploy")]
3426    #[tokio::test]
3427    async fn top_level_bounded_into_singleton_cardinality() {
3428        let mut deployment = Deployment::new();
3429
3430        let mut flow = FlowBuilder::new();
3431        let node = flow.process::<()>();
3432        let external = flow.external::<()>();
3433
3434        let (input_port, input) =
3435            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3436
3437        let out = sliced! {
3438            let input = use(input, nondet!(/** test */));
3439            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
3440            input.cross_singleton(v.into_stream().count())
3441        }
3442        .send_bincode_external(&external);
3443
3444        let nodes = flow
3445            .with_process(&node, deployment.Localhost())
3446            .with_external(&external, deployment.Localhost())
3447            .deploy(&mut deployment);
3448
3449        deployment.deploy().await.unwrap();
3450
3451        let mut external_in = nodes.connect(input_port).await;
3452        let mut external_out = nodes.connect(out).await;
3453
3454        deployment.start().await.unwrap();
3455
3456        external_in.send(1).await.unwrap();
3457        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3458
3459        external_in.send(2).await.unwrap();
3460        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3461    }
3462
3463    #[cfg(feature = "deploy")]
3464    #[tokio::test]
3465    async fn atomic_fold_replays_each_tick() {
3466        let mut deployment = Deployment::new();
3467
3468        let mut flow = FlowBuilder::new();
3469        let node = flow.process::<()>();
3470        let external = flow.external::<()>();
3471
3472        let (input_port, input) =
3473            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3474        let tick = node.tick();
3475
3476        let out = input
3477            .batch(&tick, nondet!(/** test */))
3478            .cross_singleton(
3479                node.source_iter(q!(vec![1, 2, 3]))
3480                    .atomic()
3481                    .fold(q!(|| 0), q!(|acc, v| *acc += v))
3482                    .snapshot_atomic(&tick, nondet!(/** test */)),
3483            )
3484            .all_ticks()
3485            .send_bincode_external(&external);
3486
3487        let nodes = flow
3488            .with_process(&node, deployment.Localhost())
3489            .with_external(&external, deployment.Localhost())
3490            .deploy(&mut deployment);
3491
3492        deployment.deploy().await.unwrap();
3493
3494        let mut external_in = nodes.connect(input_port).await;
3495        let mut external_out = nodes.connect(out).await;
3496
3497        deployment.start().await.unwrap();
3498
3499        external_in.send(1).await.unwrap();
3500        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3501
3502        external_in.send(2).await.unwrap();
3503        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3504    }
3505
3506    #[cfg(feature = "deploy")]
3507    #[tokio::test]
3508    async fn unbounded_scan_remembers_state() {
3509        let mut deployment = Deployment::new();
3510
3511        let mut flow = FlowBuilder::new();
3512        let node = flow.process::<()>();
3513        let external = flow.external::<()>();
3514
3515        let (input_port, input) = node.source_external_bincode(&external);
3516        let out = input
3517            .scan(
3518                q!(|| 0),
3519                q!(|acc, v| {
3520                    *acc += v;
3521                    Some(*acc)
3522                }),
3523            )
3524            .send_bincode_external(&external);
3525
3526        let nodes = flow
3527            .with_process(&node, deployment.Localhost())
3528            .with_external(&external, deployment.Localhost())
3529            .deploy(&mut deployment);
3530
3531        deployment.deploy().await.unwrap();
3532
3533        let mut external_in = nodes.connect(input_port).await;
3534        let mut external_out = nodes.connect(out).await;
3535
3536        deployment.start().await.unwrap();
3537
3538        external_in.send(1).await.unwrap();
3539        assert_eq!(external_out.next().await.unwrap(), 1);
3540
3541        external_in.send(2).await.unwrap();
3542        assert_eq!(external_out.next().await.unwrap(), 3);
3543    }
3544
3545    #[cfg(feature = "deploy")]
3546    #[tokio::test]
3547    async fn unbounded_enumerate_remembers_state() {
3548        let mut deployment = Deployment::new();
3549
3550        let mut flow = FlowBuilder::new();
3551        let node = flow.process::<()>();
3552        let external = flow.external::<()>();
3553
3554        let (input_port, input) = node.source_external_bincode(&external);
3555        let out = input.enumerate().send_bincode_external(&external);
3556
3557        let nodes = flow
3558            .with_process(&node, deployment.Localhost())
3559            .with_external(&external, deployment.Localhost())
3560            .deploy(&mut deployment);
3561
3562        deployment.deploy().await.unwrap();
3563
3564        let mut external_in = nodes.connect(input_port).await;
3565        let mut external_out = nodes.connect(out).await;
3566
3567        deployment.start().await.unwrap();
3568
3569        external_in.send(1).await.unwrap();
3570        assert_eq!(external_out.next().await.unwrap(), (0, 1));
3571
3572        external_in.send(2).await.unwrap();
3573        assert_eq!(external_out.next().await.unwrap(), (1, 2));
3574    }
3575
3576    #[cfg(feature = "deploy")]
3577    #[tokio::test]
3578    async fn unbounded_unique_remembers_state() {
3579        let mut deployment = Deployment::new();
3580
3581        let mut flow = FlowBuilder::new();
3582        let node = flow.process::<()>();
3583        let external = flow.external::<()>();
3584
3585        let (input_port, input) =
3586            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3587        let out = input.unique().send_bincode_external(&external);
3588
3589        let nodes = flow
3590            .with_process(&node, deployment.Localhost())
3591            .with_external(&external, deployment.Localhost())
3592            .deploy(&mut deployment);
3593
3594        deployment.deploy().await.unwrap();
3595
3596        let mut external_in = nodes.connect(input_port).await;
3597        let mut external_out = nodes.connect(out).await;
3598
3599        deployment.start().await.unwrap();
3600
3601        external_in.send(1).await.unwrap();
3602        assert_eq!(external_out.next().await.unwrap(), 1);
3603
3604        external_in.send(2).await.unwrap();
3605        assert_eq!(external_out.next().await.unwrap(), 2);
3606
3607        external_in.send(1).await.unwrap();
3608        external_in.send(3).await.unwrap();
3609        assert_eq!(external_out.next().await.unwrap(), 3);
3610    }
3611
3612    #[cfg(feature = "sim")]
3613    #[test]
3614    #[should_panic]
3615    fn sim_batch_nondet_size() {
3616        let mut flow = FlowBuilder::new();
3617        let node = flow.process::<()>();
3618
3619        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3620
3621        let tick = node.tick();
3622        let out_recv = input
3623            .batch(&tick, nondet!(/** test */))
3624            .count()
3625            .all_ticks()
3626            .sim_output();
3627
3628        flow.sim().exhaustive(async || {
3629            in_send.send(());
3630            in_send.send(());
3631            in_send.send(());
3632
3633            assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3634        });
3635    }
3636
3637    #[cfg(feature = "sim")]
3638    #[test]
3639    fn sim_batch_preserves_order() {
3640        let mut flow = FlowBuilder::new();
3641        let node = flow.process::<()>();
3642
3643        let (in_send, input) = node.sim_input();
3644
3645        let tick = node.tick();
3646        let out_recv = input
3647            .batch(&tick, nondet!(/** test */))
3648            .all_ticks()
3649            .sim_output();
3650
3651        flow.sim().exhaustive(async || {
3652            in_send.send(1);
3653            in_send.send(2);
3654            in_send.send(3);
3655
3656            out_recv.assert_yields_only([1, 2, 3]).await;
3657        });
3658    }
3659
3660    #[cfg(feature = "sim")]
3661    #[test]
3662    #[should_panic]
3663    fn sim_batch_unordered_shuffles() {
3664        let mut flow = FlowBuilder::new();
3665        let node = flow.process::<()>();
3666
3667        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3668
3669        let tick = node.tick();
3670        let batch = input.batch(&tick, nondet!(/** test */));
3671        let out_recv = batch
3672            .clone()
3673            .min()
3674            .zip(batch.max())
3675            .all_ticks()
3676            .sim_output();
3677
3678        flow.sim().exhaustive(async || {
3679            in_send.send_many_unordered([1, 2, 3]);
3680
3681            if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3682                panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3683            }
3684        });
3685    }
3686
3687    #[cfg(feature = "sim")]
3688    #[test]
3689    fn sim_batch_unordered_shuffles_count() {
3690        let mut flow = FlowBuilder::new();
3691        let node = flow.process::<()>();
3692
3693        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3694
3695        let tick = node.tick();
3696        let batch = input.batch(&tick, nondet!(/** test */));
3697        let out_recv = batch.all_ticks().sim_output();
3698
3699        let instance_count = flow.sim().exhaustive(async || {
3700            in_send.send_many_unordered([1, 2, 3, 4]);
3701            out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3702        });
3703
3704        assert_eq!(
3705            instance_count,
3706            75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3707        )
3708    }
3709
3710    #[cfg(feature = "sim")]
3711    #[test]
3712    #[should_panic]
3713    fn sim_observe_order_batched() {
3714        let mut flow = FlowBuilder::new();
3715        let node = flow.process::<()>();
3716
3717        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3718
3719        let tick = node.tick();
3720        let batch = input.batch(&tick, nondet!(/** test */));
3721        let out_recv = batch
3722            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3723            .all_ticks()
3724            .sim_output();
3725
3726        flow.sim().exhaustive(async || {
3727            in_send.send_many_unordered([1, 2, 3, 4]);
3728            out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3729        });
3730    }
3731
3732    #[cfg(feature = "sim")]
3733    #[test]
3734    fn sim_observe_order_batched_count() {
3735        let mut flow = FlowBuilder::new();
3736        let node = flow.process::<()>();
3737
3738        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3739
3740        let tick = node.tick();
3741        let batch = input.batch(&tick, nondet!(/** test */));
3742        let out_recv = batch
3743            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3744            .all_ticks()
3745            .sim_output();
3746
3747        let instance_count = flow.sim().exhaustive(async || {
3748            in_send.send_many_unordered([1, 2, 3, 4]);
3749            let _ = out_recv.collect::<Vec<_>>().await;
3750        });
3751
3752        assert_eq!(
3753            instance_count,
3754            192 // 4! * 2^{4 - 1}
3755        )
3756    }
3757
3758    #[cfg(feature = "sim")]
3759    #[test]
3760    fn sim_unordered_count_instance_count() {
3761        let mut flow = FlowBuilder::new();
3762        let node = flow.process::<()>();
3763
3764        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3765
3766        let tick = node.tick();
3767        let out_recv = input
3768            .count()
3769            .snapshot(&tick, nondet!(/** test */))
3770            .all_ticks()
3771            .sim_output();
3772
3773        let instance_count = flow.sim().exhaustive(async || {
3774            in_send.send_many_unordered([1, 2, 3, 4]);
3775            assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3776        });
3777
3778        assert_eq!(
3779            instance_count,
3780            16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3781        )
3782    }
3783
3784    #[cfg(feature = "sim")]
3785    #[test]
3786    fn sim_top_level_assume_ordering() {
3787        let mut flow = FlowBuilder::new();
3788        let node = flow.process::<()>();
3789
3790        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3791
3792        let out_recv = input
3793            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3794            .sim_output();
3795
3796        let instance_count = flow.sim().exhaustive(async || {
3797            in_send.send_many_unordered([1, 2, 3]);
3798            let mut out = out_recv.collect::<Vec<_>>().await;
3799            out.sort();
3800            assert_eq!(out, vec![1, 2, 3]);
3801        });
3802
3803        assert_eq!(instance_count, 6)
3804    }
3805
3806    #[cfg(feature = "sim")]
3807    #[test]
3808    fn sim_top_level_assume_ordering_cycle_back() {
3809        let mut flow = FlowBuilder::new();
3810        let node = flow.process::<()>();
3811        let node2 = flow.process::<()>();
3812
3813        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3814
3815        let (complete_cycle_back, cycle_back) =
3816            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3817        let ordered = input
3818            .merge_unordered(cycle_back)
3819            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3820        complete_cycle_back.complete(
3821            ordered
3822                .clone()
3823                .map(q!(|v| v + 1))
3824                .filter(q!(|v| v % 2 == 1))
3825                .send(&node2, TCP.fail_stop().bincode())
3826                .send(&node, TCP.fail_stop().bincode()),
3827        );
3828
3829        let out_recv = ordered.sim_output();
3830
3831        let mut saw = false;
3832        let instance_count = flow.sim().exhaustive(async || {
3833            in_send.send_many_unordered([0, 2]);
3834            let out = out_recv.collect::<Vec<_>>().await;
3835
3836            if out.starts_with(&[0, 1, 2]) {
3837                saw = true;
3838            }
3839        });
3840
3841        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3842        assert_eq!(instance_count, 6);
3843    }
3844
3845    #[cfg(feature = "sim")]
3846    #[test]
3847    fn sim_top_level_assume_ordering_cycle_back_tick() {
3848        let mut flow = FlowBuilder::new();
3849        let node = flow.process::<()>();
3850        let node2 = flow.process::<()>();
3851
3852        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3853
3854        let (complete_cycle_back, cycle_back) =
3855            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3856        let ordered = input
3857            .merge_unordered(cycle_back)
3858            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3859        complete_cycle_back.complete(
3860            ordered
3861                .clone()
3862                .batch(&node.tick(), nondet!(/** test */))
3863                .all_ticks()
3864                .map(q!(|v| v + 1))
3865                .filter(q!(|v| v % 2 == 1))
3866                .send(&node2, TCP.fail_stop().bincode())
3867                .send(&node, TCP.fail_stop().bincode()),
3868        );
3869
3870        let out_recv = ordered.sim_output();
3871
3872        let mut saw = false;
3873        let instance_count = flow.sim().exhaustive(async || {
3874            in_send.send_many_unordered([0, 2]);
3875            let out = out_recv.collect::<Vec<_>>().await;
3876
3877            if out.starts_with(&[0, 1, 2]) {
3878                saw = true;
3879            }
3880        });
3881
3882        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3883        assert_eq!(instance_count, 58);
3884    }
3885
3886    #[cfg(feature = "sim")]
3887    #[test]
3888    fn sim_top_level_assume_ordering_multiple() {
3889        let mut flow = FlowBuilder::new();
3890        let node = flow.process::<()>();
3891        let node2 = flow.process::<()>();
3892
3893        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3894        let (_, input2) = node.sim_input::<_, NoOrder, _>();
3895
3896        let (complete_cycle_back, cycle_back) =
3897            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3898        let input1_ordered = input
3899            .clone()
3900            .merge_unordered(cycle_back)
3901            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3902        let foo = input1_ordered
3903            .clone()
3904            .map(q!(|v| v + 3))
3905            .weaken_ordering::<NoOrder>()
3906            .merge_unordered(input2)
3907            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3908
3909        complete_cycle_back.complete(
3910            foo.filter(q!(|v| *v == 3))
3911                .send(&node2, TCP.fail_stop().bincode())
3912                .send(&node, TCP.fail_stop().bincode()),
3913        );
3914
3915        let out_recv = input1_ordered.sim_output();
3916
3917        let mut saw = false;
3918        let instance_count = flow.sim().exhaustive(async || {
3919            in_send.send_many_unordered([0, 1]);
3920            let out = out_recv.collect::<Vec<_>>().await;
3921
3922            if out.starts_with(&[0, 3, 1]) {
3923                saw = true;
3924            }
3925        });
3926
3927        assert!(saw, "did not see an instance with 0, 3, 1 in order");
3928        assert_eq!(instance_count, 24);
3929    }
3930
3931    #[cfg(feature = "sim")]
3932    #[test]
3933    fn sim_atomic_assume_ordering_cycle_back() {
3934        let mut flow = FlowBuilder::new();
3935        let node = flow.process::<()>();
3936        let node2 = flow.process::<()>();
3937
3938        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3939
3940        let (complete_cycle_back, cycle_back) =
3941            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3942        let ordered = input
3943            .merge_unordered(cycle_back)
3944            .atomic()
3945            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3946            .end_atomic();
3947        complete_cycle_back.complete(
3948            ordered
3949                .clone()
3950                .map(q!(|v| v + 1))
3951                .filter(q!(|v| v % 2 == 1))
3952                .send(&node2, TCP.fail_stop().bincode())
3953                .send(&node, TCP.fail_stop().bincode()),
3954        );
3955
3956        let out_recv = ordered.sim_output();
3957
3958        let instance_count = flow.sim().exhaustive(async || {
3959            in_send.send_many_unordered([0, 2]);
3960            let out = out_recv.collect::<Vec<_>>().await;
3961            assert_eq!(out.len(), 4);
3962        });
3963        assert_eq!(instance_count, 22);
3964    }
3965
3966    #[cfg(feature = "deploy")]
3967    #[tokio::test]
3968    async fn partition_evens_odds() {
3969        let mut deployment = Deployment::new();
3970
3971        let mut flow = FlowBuilder::new();
3972        let node = flow.process::<()>();
3973        let external = flow.external::<()>();
3974
3975        let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
3976        let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
3977        let evens_port = evens.send_bincode_external(&external);
3978        let odds_port = odds.send_bincode_external(&external);
3979
3980        let nodes = flow
3981            .with_process(&node, deployment.Localhost())
3982            .with_external(&external, deployment.Localhost())
3983            .deploy(&mut deployment);
3984
3985        deployment.deploy().await.unwrap();
3986
3987        let mut evens_out = nodes.connect(evens_port).await;
3988        let mut odds_out = nodes.connect(odds_port).await;
3989
3990        deployment.start().await.unwrap();
3991
3992        let mut even_results = Vec::new();
3993        for _ in 0..3 {
3994            even_results.push(evens_out.next().await.unwrap());
3995        }
3996        even_results.sort();
3997        assert_eq!(even_results, vec![2, 4, 6]);
3998
3999        let mut odd_results = Vec::new();
4000        for _ in 0..3 {
4001            odd_results.push(odds_out.next().await.unwrap());
4002        }
4003        odd_results.sort();
4004        assert_eq!(odd_results, vec![1, 3, 5]);
4005    }
4006
4007    #[cfg(feature = "deploy")]
4008    #[tokio::test]
4009    async fn unconsumed_inspect_still_runs() {
4010        use crate::deploy::DeployCrateWrapper;
4011
4012        let mut deployment = Deployment::new();
4013
4014        let mut flow = FlowBuilder::new();
4015        let node = flow.process::<()>();
4016
4017        // The return value of .inspect() is intentionally dropped.
4018        // Before the Null-root fix, this would silently do nothing.
4019        node.source_iter(q!(0..5))
4020            .inspect(q!(|x| println!("inspect: {}", x)));
4021
4022        let nodes = flow
4023            .with_process(&node, deployment.Localhost())
4024            .deploy(&mut deployment);
4025
4026        deployment.deploy().await.unwrap();
4027
4028        let mut stdout = nodes.get_process(&node).stdout();
4029
4030        deployment.start().await.unwrap();
4031
4032        let mut lines = Vec::new();
4033        for _ in 0..5 {
4034            lines.push(stdout.recv().await.unwrap());
4035        }
4036        lines.sort();
4037        assert_eq!(
4038            lines,
4039            vec![
4040                "inspect: 0",
4041                "inspect: 1",
4042                "inspect: 2",
4043                "inspect: 3",
4044                "inspect: 4",
4045            ]
4046        );
4047    }
4048
4049    #[cfg(feature = "sim")]
4050    #[test]
4051    fn sim_limit() {
4052        let mut flow = FlowBuilder::new();
4053        let node = flow.process::<()>();
4054
4055        let (in_send, input) = node.sim_input();
4056
4057        let out_recv = input.limit(q!(3)).sim_output();
4058
4059        flow.sim().exhaustive(async || {
4060            in_send.send(1);
4061            in_send.send(2);
4062            in_send.send(3);
4063            in_send.send(4);
4064            in_send.send(5);
4065
4066            out_recv.assert_yields_only([1, 2, 3]).await;
4067        });
4068    }
4069
4070    #[cfg(feature = "sim")]
4071    #[test]
4072    fn sim_limit_zero() {
4073        let mut flow = FlowBuilder::new();
4074        let node = flow.process::<()>();
4075
4076        let (in_send, input) = node.sim_input();
4077
4078        let out_recv = input.limit(q!(0)).sim_output();
4079
4080        flow.sim().exhaustive(async || {
4081            in_send.send(1);
4082            in_send.send(2);
4083
4084            out_recv.assert_yields_only::<i32, _>([]).await;
4085        });
4086    }
4087
4088    #[cfg(feature = "sim")]
4089    #[test]
4090    fn sim_merge_ordered() {
4091        let mut flow = FlowBuilder::new();
4092        let node = flow.process::<()>();
4093
4094        let (in_send, input) = node.sim_input();
4095        let (in_send2, input2) = node.sim_input();
4096
4097        let out_recv = input
4098            .merge_ordered(input2, nondet!(/** test */))
4099            .sim_output();
4100
4101        let mut saw_out_of_order = false;
4102        let instances = flow.sim().exhaustive(async || {
4103            in_send.send(1);
4104            in_send.send(2);
4105            in_send2.send(3);
4106            in_send2.send(4);
4107
4108            let out = out_recv.collect::<Vec<_>>().await;
4109
4110            if out == [1, 3, 2, 4] {
4111                saw_out_of_order = true;
4112            }
4113
4114            // Assert ordering preservation: elements from each input must
4115            // appear in their original relative order.
4116            let mut first_elements = out.iter().filter(|v| **v <= 2).copied().collect::<Vec<_>>();
4117            let mut second_elements = out.iter().filter(|v| **v > 2).copied().collect::<Vec<_>>();
4118            assert_eq!(
4119                first_elements,
4120                vec![1, 2],
4121                "first input order violated: {:?}",
4122                out
4123            );
4124            assert_eq!(
4125                second_elements,
4126                vec![3, 4],
4127                "second input order violated: {:?}",
4128                out
4129            );
4130
4131            first_elements.append(&mut second_elements);
4132            first_elements.sort();
4133            assert_eq!(first_elements, vec![1, 2, 3, 4]);
4134        });
4135
4136        assert!(saw_out_of_order);
4137        assert_eq!(instances, 6);
4138    }
4139
4140    /// Tests that merge_ordered passes through elements when only one input
4141    /// has data.
4142    #[cfg(feature = "sim")]
4143    #[test]
4144    fn sim_merge_ordered_one_empty() {
4145        let mut flow = FlowBuilder::new();
4146        let node = flow.process::<()>();
4147
4148        let (in_send, input) = node.sim_input();
4149        let (_in_send2, input2) = node.sim_input();
4150
4151        let out_recv = input
4152            .merge_ordered(input2, nondet!(/** test */))
4153            .sim_output();
4154
4155        let instances = flow.sim().exhaustive(async || {
4156            in_send.send(1);
4157            in_send.send(2);
4158
4159            let out = out_recv.collect::<Vec<_>>().await;
4160            assert_eq!(out, vec![1, 2]);
4161        });
4162
4163        // Only one possible interleaving when one input is empty
4164        assert_eq!(instances, 1);
4165    }
4166
4167    /// Tests that merge_ordered correctly handles feedback cycles.
4168    /// An element output from merge_ordered is filtered and cycled back to
4169    /// one of its inputs. The one-at-a-time release must allow the cycled-back
4170    /// element to arrive and potentially be emitted before elements still
4171    /// waiting on the other input.
4172    #[cfg(feature = "sim")]
4173    #[test]
4174    fn sim_merge_ordered_cycle_back() {
4175        let mut flow = FlowBuilder::new();
4176        let node = flow.process::<()>();
4177
4178        let (in_send, input) = node.sim_input();
4179
4180        // Create a forward ref for the cycle back
4181        let (complete_cycle_back, cycle_back) =
4182            node.forward_ref::<super::Stream<_, _, _, TotalOrder>>();
4183
4184        // merge_ordered: input (external) with cycle_back
4185        let merged = input.merge_ordered(cycle_back, nondet!(/** test */));
4186
4187        // Cycle back: elements equal to 1 get mapped to 10 and fed back
4188        complete_cycle_back.complete(merged.clone().filter(q!(|v| *v == 1)).map(q!(|v| v * 10)));
4189
4190        let out_recv = merged.sim_output();
4191
4192        // Send 1 and 2. Element 1 should cycle back as 10.
4193        // Valid orderings must have 1 before 10 (since 10 depends on 1).
4194        let mut saw_cycle_before_second = false;
4195        flow.sim().exhaustive(async || {
4196            in_send.send(1);
4197            in_send.send(2);
4198
4199            let out = out_recv.collect::<Vec<_>>().await;
4200
4201            // 10 must always come after 1 (causal dependency)
4202            let pos_1 = out.iter().position(|v| *v == 1).unwrap();
4203            let pos_10 = out.iter().position(|v| *v == 10).unwrap();
4204            assert!(pos_1 < pos_10, "causal order violated: {:?}", out);
4205
4206            // Check if we see [1, 10, 2] — the cycled element beats the second input
4207            if out == [1, 10, 2] {
4208                saw_cycle_before_second = true;
4209            }
4210
4211            let mut sorted = out;
4212            sorted.sort();
4213            assert_eq!(sorted, vec![1, 2, 10]);
4214        });
4215
4216        assert!(
4217            saw_cycle_before_second,
4218            "never saw the cycled element arrive before the second input element"
4219        );
4220    }
4221
4222    /// Tests that merge_ordered correctly interleaves when one input has a
4223    /// delayed element. With a: [1, _delay_, 2] and b: [3, 4], the delayed
4224    /// element 2 should be able to appear after b's elements.
4225    #[cfg(feature = "sim")]
4226    #[test]
4227    fn sim_merge_ordered_delayed() {
4228        let mut flow = FlowBuilder::new();
4229        let node = flow.process::<()>();
4230
4231        let (in_send, input) = node.sim_input();
4232        let (in_send2, input2) = node.sim_input();
4233
4234        let out_recv = input
4235            .merge_ordered(input2, nondet!(/** test */))
4236            .sim_output();
4237
4238        let mut saw_delayed_interleaving = false;
4239        flow.sim().exhaustive(async || {
4240            // Send 1 from a, and 3, 4 from b
4241            in_send.send(1);
4242            in_send2.send(3);
4243            in_send2.send(4);
4244
4245            // Collect what's available so far
4246            let first_batch = out_recv.collect::<Vec<_>>().await;
4247
4248            // Now send the delayed element 2 from a
4249            in_send.send(2);
4250            let second_batch = out_recv.collect::<Vec<_>>().await;
4251
4252            let mut all: Vec<_> = first_batch
4253                .iter()
4254                .chain(second_batch.iter())
4255                .copied()
4256                .collect();
4257
4258            // Check if we saw [1, 3, 4, 2] — the delayed interleaving
4259            if all == [1, 3, 4, 2] {
4260                saw_delayed_interleaving = true;
4261            }
4262
4263            all.sort();
4264            assert_eq!(all, vec![1, 2, 3, 4]);
4265        });
4266
4267        assert!(saw_delayed_interleaving);
4268    }
4269
4270    /// Deploy test: merge_ordered with a delayed element on one input.
4271    /// Sends a=1, b=3, b=4, then after receiving those, sends a=2.
4272    /// Expects to see [1, 3, 4] first, then [2] — demonstrating that
4273    /// both inputs are pulled and the delayed element arrives later.
4274    #[cfg(feature = "deploy")]
4275    #[tokio::test]
4276    async fn deploy_merge_ordered_delayed() {
4277        let mut deployment = Deployment::new();
4278
4279        let mut flow = FlowBuilder::new();
4280        let node = flow.process::<()>();
4281        let external = flow.external::<()>();
4282
4283        let (input_a_port, input_a) = node.source_external_bincode(&external);
4284        let (input_b_port, input_b) = node.source_external_bincode(&external);
4285
4286        let out = input_a
4287            .assume_ordering(nondet!(/** test */))
4288            .merge_ordered(
4289                input_b.assume_ordering(nondet!(/** test */)),
4290                nondet!(/** test */),
4291            )
4292            .send_bincode_external(&external);
4293
4294        let nodes = flow
4295            .with_process(&node, deployment.Localhost())
4296            .with_external(&external, deployment.Localhost())
4297            .deploy(&mut deployment);
4298
4299        deployment.deploy().await.unwrap();
4300
4301        let mut ext_a = nodes.connect(input_a_port).await;
4302        let mut ext_b = nodes.connect(input_b_port).await;
4303        let mut ext_out = nodes.connect(out).await;
4304
4305        deployment.start().await.unwrap();
4306
4307        // Send a=1, b=3, b=4
4308        ext_a.send(1).await.unwrap();
4309        ext_b.send(3).await.unwrap();
4310        ext_b.send(4).await.unwrap();
4311
4312        // Collect the first 3 elements
4313        let mut received = Vec::new();
4314        for _ in 0..3 {
4315            received.push(ext_out.next().await.unwrap());
4316        }
4317
4318        // Now send the delayed a=2
4319        ext_a.send(2).await.unwrap();
4320        received.push(ext_out.next().await.unwrap());
4321
4322        // All elements should be present
4323        received.sort();
4324        assert_eq!(received, vec![1, 2, 3, 4]);
4325    }
4326
4327    #[cfg(feature = "deploy")]
4328    #[tokio::test]
4329    async fn monotone_fold_threshold() {
4330        use crate::properties::manual_proof;
4331
4332        let mut deployment = Deployment::new();
4333
4334        let mut flow = FlowBuilder::new();
4335        let node = flow.process::<()>();
4336        let external = flow.external::<()>();
4337
4338        let in_unbounded: super::Stream<_, _> =
4339            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4340        let sum = in_unbounded.fold(
4341            q!(|| 0),
4342            q!(
4343                |sum, v| {
4344                    *sum += v;
4345                },
4346                monotone = manual_proof!(/** test */)
4347            ),
4348        );
4349
4350        let threshold_out = sum
4351            .threshold_greater_or_equal(node.singleton(q!(7)))
4352            .send_bincode_external(&external);
4353
4354        let nodes = flow
4355            .with_process(&node, deployment.Localhost())
4356            .with_external(&external, deployment.Localhost())
4357            .deploy(&mut deployment);
4358
4359        deployment.deploy().await.unwrap();
4360
4361        let mut threshold_out = nodes.connect(threshold_out).await;
4362
4363        deployment.start().await.unwrap();
4364
4365        assert_eq!(threshold_out.next().await.unwrap(), 7);
4366    }
4367
4368    #[cfg(feature = "deploy")]
4369    #[tokio::test]
4370    async fn monotone_count_threshold() {
4371        let mut deployment = Deployment::new();
4372
4373        let mut flow = FlowBuilder::new();
4374        let node = flow.process::<()>();
4375        let external = flow.external::<()>();
4376
4377        let in_unbounded: super::Stream<_, _> =
4378            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4379        let sum = in_unbounded.count();
4380
4381        let threshold_out = sum
4382            .threshold_greater_or_equal(node.singleton(q!(3)))
4383            .send_bincode_external(&external);
4384
4385        let nodes = flow
4386            .with_process(&node, deployment.Localhost())
4387            .with_external(&external, deployment.Localhost())
4388            .deploy(&mut deployment);
4389
4390        deployment.deploy().await.unwrap();
4391
4392        let mut threshold_out = nodes.connect(threshold_out).await;
4393
4394        deployment.start().await.unwrap();
4395
4396        assert_eq!(threshold_out.next().await.unwrap(), 3);
4397    }
4398
4399    #[cfg(feature = "deploy")]
4400    #[tokio::test]
4401    async fn monotone_map_order_preserving_threshold() {
4402        use crate::properties::manual_proof;
4403
4404        let mut deployment = Deployment::new();
4405
4406        let mut flow = FlowBuilder::new();
4407        let node = flow.process::<()>();
4408        let external = flow.external::<()>();
4409
4410        let in_unbounded: super::Stream<_, _> =
4411            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4412        let sum = in_unbounded.fold(
4413            q!(|| 0),
4414            q!(
4415                |sum, v| {
4416                    *sum += v;
4417                },
4418                monotone = manual_proof!(/** test */)
4419            ),
4420        );
4421
4422        // map with order_preserving should preserve monotonicity
4423        let doubled = sum.map(q!(
4424            |v| v * 2,
4425            order_preserving = manual_proof!(/** doubling preserves order */)
4426        ));
4427
4428        let threshold_out = doubled
4429            .threshold_greater_or_equal(node.singleton(q!(14)))
4430            .send_bincode_external(&external);
4431
4432        let nodes = flow
4433            .with_process(&node, deployment.Localhost())
4434            .with_external(&external, deployment.Localhost())
4435            .deploy(&mut deployment);
4436
4437        deployment.deploy().await.unwrap();
4438
4439        let mut threshold_out = nodes.connect(threshold_out).await;
4440
4441        deployment.start().await.unwrap();
4442
4443        assert_eq!(threshold_out.next().await.unwrap(), 14);
4444    }
4445
4446    // === Compile-time type tests for join/cross_product ordering ===
4447
4448    #[cfg(any(feature = "deploy", feature = "sim"))]
4449    mod join_ordering_type_tests {
4450        use crate::live_collections::boundedness::{Bounded, Unbounded};
4451        use crate::live_collections::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
4452        use crate::location::{Location, Process};
4453
4454        #[expect(dead_code, reason = "compile-time type test")]
4455        fn join_unbounded_with_bounded_preserves_order<'a>(
4456            left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4457            right: Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4458        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4459            left.join(right)
4460        }
4461
4462        #[expect(dead_code, reason = "compile-time type test")]
4463        fn join_unbounded_with_unbounded_is_no_order<'a>(
4464            left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4465            right: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4466        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4467            left.join(right)
4468        }
4469
4470        #[expect(dead_code, reason = "compile-time type test")]
4471        fn join_bounded_with_bounded_preserves_order<'a, L: Location<'a>>(
4472            left: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4473            right: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4474        ) -> Stream<(i32, (char, char)), L, Bounded, TotalOrder, ExactlyOnce> {
4475            left.join(right)
4476        }
4477
4478        #[expect(dead_code, reason = "compile-time type test")]
4479        fn join_unbounded_noorder_with_bounded<'a>(
4480            left: Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce>,
4481            right: Stream<(i32, char), Process<'a>, Bounded, NoOrder, ExactlyOnce>,
4482        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4483            left.join(right)
4484        }
4485
4486        // === Compile-time type tests for cross_product ordering ===
4487
4488        #[expect(dead_code, reason = "compile-time type test")]
4489        fn cross_product_unbounded_with_bounded_preserves_order<'a>(
4490            left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4491            right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4492        ) -> Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4493            left.cross_product(right)
4494        }
4495
4496        #[expect(dead_code, reason = "compile-time type test")]
4497        fn cross_product_bounded_with_bounded_preserves_order<'a>(
4498            left: Stream<i32, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4499            right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4500        ) -> Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce> {
4501            left.cross_product(right)
4502        }
4503
4504        #[expect(dead_code, reason = "compile-time type test")]
4505        fn cross_product_unbounded_with_unbounded_is_no_order<'a>(
4506            left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4507            right: Stream<char, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4508        ) -> Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4509            left.cross_product(right)
4510        }
4511    } // mod join_ordering_type_tests
4512
4513    // === Runtime correctness tests for bounded join/cross_product ===
4514
4515    #[cfg(feature = "sim")]
4516    #[test]
4517    fn cross_product_mixed_boundedness_correctness() {
4518        use stageleft::q;
4519
4520        use crate::compile::builder::FlowBuilder;
4521        use crate::nondet::nondet;
4522
4523        let mut flow = FlowBuilder::new();
4524        let process = flow.process::<()>();
4525        let tick = process.tick();
4526
4527        let left = process.source_iter(q!(vec![1, 2]));
4528        let right = process
4529            .source_iter(q!(vec!['a', 'b']))
4530            .batch(&tick, nondet!(/** test */))
4531            .all_ticks();
4532
4533        let out = left.cross_product(right).sim_output();
4534
4535        flow.sim().exhaustive(async || {
4536            out.assert_yields_only_unordered(vec![(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')])
4537                .await;
4538        });
4539    }
4540
4541    #[cfg(feature = "sim")]
4542    #[test]
4543    fn join_mixed_boundedness_correctness() {
4544        use stageleft::q;
4545
4546        use crate::compile::builder::FlowBuilder;
4547        use crate::nondet::nondet;
4548
4549        let mut flow = FlowBuilder::new();
4550        let process = flow.process::<()>();
4551        let tick = process.tick();
4552
4553        let left = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
4554        let right = process
4555            .source_iter(q!(vec![(1, 'x'), (2, 'y')]))
4556            .batch(&tick, nondet!(/** test */))
4557            .all_ticks();
4558
4559        let out = left.join(right).sim_output();
4560
4561        flow.sim().exhaustive(async || {
4562            out.assert_yields_only_unordered(vec![(1, ('a', 'x')), (2, ('b', 'y'))])
4563                .await;
4564        });
4565    }
4566}