Skip to main content

hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::{Generate, KeyedStream};
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::singleton::SingletonBound;
27#[cfg(stageleft_runtime)]
28use crate::location::dynamic::{DynLocation, LocationId};
29use crate::location::tick::{Atomic, DeferTick};
30use crate::location::{Location, Tick, TopLevel, check_matching_location};
31use crate::manual_expr::ManualExpr;
32use crate::nondet::{NonDet, nondet};
33use crate::prelude::manual_proof;
34use crate::properties::{
35    AggFuncAlgebra, ApplyMonotoneStream, StreamMapFuncAlgebra, ValidCommutativityFor,
36    ValidIdempotenceFor, ValidMutBorrowCommutativityFor, ValidMutBorrowIdempotenceFor,
37    ValidMutCommutativityFor, ValidMutIdempotenceFor,
38};
39
40pub mod networking;
41
42/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
43#[sealed::sealed]
44pub trait Ordering:
45    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
46{
47    /// The [`StreamOrder`] corresponding to this type.
48    const ORDERING_KIND: StreamOrder;
49}
50
51/// Marks the stream as being totally ordered, which means that there are
52/// no sources of non-determinism (other than intentional ones) that will
53/// affect the order of elements.
54pub enum TotalOrder {}
55
56#[sealed::sealed]
57impl Ordering for TotalOrder {
58    const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
59}
60
61/// Marks the stream as having no order, which means that the order of
62/// elements may be affected by non-determinism.
63///
64/// This restricts certain operators, such as `fold` and `reduce`, to only
65/// be used with commutative aggregation functions.
66pub enum NoOrder {}
67
68#[sealed::sealed]
69impl Ordering for NoOrder {
70    const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
71}
72
73/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
74/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
75/// have `Self` guarantees instead.
76#[sealed::sealed]
77pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
78#[sealed::sealed]
79impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
80
81/// Helper trait for determining the weakest of two orderings.
82#[sealed::sealed]
83pub trait MinOrder<Other: ?Sized> {
84    /// The weaker of the two orderings.
85    type Min: Ordering;
86}
87
88#[sealed::sealed]
89impl<O: Ordering> MinOrder<O> for TotalOrder {
90    type Min = O;
91}
92
93#[sealed::sealed]
94impl<O: Ordering> MinOrder<O> for NoOrder {
95    type Min = NoOrder;
96}
97
98/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
99#[sealed::sealed]
100pub trait Retries:
101    MinRetries<Self, Min = Self>
102    + MinRetries<ExactlyOnce, Min = Self>
103    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
104{
105    /// The [`StreamRetry`] corresponding to this type.
106    const RETRIES_KIND: StreamRetry;
107}
108
109/// Marks the stream as having deterministic message cardinality, with no
110/// possibility of duplicates.
111pub enum ExactlyOnce {}
112
113#[sealed::sealed]
114impl Retries for ExactlyOnce {
115    const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
116}
117
118/// Marks the stream as having non-deterministic message cardinality, which
119/// means that duplicates may occur, but messages will not be dropped.
120pub enum AtLeastOnce {}
121
122#[sealed::sealed]
123impl Retries for AtLeastOnce {
124    const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
125}
126
127/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
128/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
129/// have `Self` guarantees instead.
130#[sealed::sealed]
131pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
132#[sealed::sealed]
133impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
134
135/// Helper trait for determining the weakest of two retry guarantees.
136#[sealed::sealed]
137pub trait MinRetries<Other: ?Sized> {
138    /// The weaker of the two retry guarantees.
139    type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
140}
141
142#[sealed::sealed]
143impl<R: Retries> MinRetries<R> for ExactlyOnce {
144    type Min = R;
145}
146
147#[sealed::sealed]
148impl<R: Retries> MinRetries<R> for AtLeastOnce {
149    type Min = AtLeastOnce;
150}
151
152#[sealed::sealed]
153#[diagnostic::on_unimplemented(
154    message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
155    label = "required here",
156    note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
157)]
158/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
159pub trait IsOrdered: Ordering {}
160
161#[sealed::sealed]
162#[diagnostic::do_not_recommend]
163impl IsOrdered for TotalOrder {}
164
165#[sealed::sealed]
166#[diagnostic::on_unimplemented(
167    message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
168    label = "required here",
169    note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
170)]
171/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
172pub trait IsExactlyOnce: Retries {}
173
174#[sealed::sealed]
175#[diagnostic::do_not_recommend]
176impl IsExactlyOnce for ExactlyOnce {}
177
178/// Streaming sequence of elements with type `Type`.
179///
180/// This live collection represents a growing sequence of elements, with new elements being
181/// asynchronously appended to the end of the sequence. This can be used to model the arrival
182/// of network input, such as API requests, or streaming ingestion.
183///
184/// By default, all streams have deterministic ordering and each element is materialized exactly
185/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
186/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
187/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
188///
189/// Type Parameters:
190/// - `Type`: the type of elements in the stream
191/// - `Loc`: the location where the stream is being materialized
192/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
193/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
194///   (default is [`TotalOrder`])
195/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
196///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
197pub struct Stream<
198    Type,
199    Loc,
200    Bound: Boundedness = Unbounded,
201    Order: Ordering = TotalOrder,
202    Retry: Retries = ExactlyOnce,
203> {
204    pub(crate) location: Loc,
205    pub(crate) ir_node: RefCell<HydroNode>,
206    pub(crate) flow_state: FlowState,
207
208    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
209}
210
211impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
212    fn drop(&mut self) {
213        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
214        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
215            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
216                input: Box::new(ir_node),
217                op_metadata: HydroIrOpMetadata::new(),
218            });
219        }
220    }
221}
222
223impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
224    for Stream<T, L, Unbounded, O, R>
225where
226    L: Location<'a>,
227{
228    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
229        let new_meta = stream
230            .location
231            .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
232
233        Stream {
234            location: stream.location.clone(),
235            flow_state: stream.flow_state.clone(),
236            ir_node: RefCell::new(HydroNode::Cast {
237                inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
238                metadata: new_meta,
239            }),
240            _phantom: PhantomData,
241        }
242    }
243}
244
245impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
246    for Stream<T, L, B, NoOrder, R>
247where
248    L: Location<'a>,
249{
250    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
251        stream.weaken_ordering()
252    }
253}
254
255impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
256    for Stream<T, L, B, O, AtLeastOnce>
257where
258    L: Location<'a>,
259{
260    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
261        stream.weaken_retries()
262    }
263}
264
265impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
266where
267    L: Location<'a>,
268{
269    fn defer_tick(self) -> Self {
270        Stream::defer_tick(self)
271    }
272}
273
274impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
275    for Stream<T, Tick<L>, Bounded, O, R>
276where
277    L: Location<'a>,
278{
279    type Location = Tick<L>;
280
281    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
282        Stream::new(
283            location.clone(),
284            HydroNode::CycleSource {
285                cycle_id,
286                metadata: location.new_node_metadata(Self::collection_kind()),
287            },
288        )
289    }
290}
291
292impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
293    for Stream<T, Tick<L>, Bounded, O, R>
294where
295    L: Location<'a>,
296{
297    type Location = Tick<L>;
298
299    fn location(&self) -> &Self::Location {
300        self.location()
301    }
302
303    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
304        let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
305            location.clone(),
306            HydroNode::DeferTick {
307                input: Box::new(HydroNode::CycleSource {
308                    cycle_id,
309                    metadata: location.new_node_metadata(Self::collection_kind()),
310                }),
311                metadata: location.new_node_metadata(Self::collection_kind()),
312            },
313        );
314
315        from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
316    }
317}
318
319impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
320    for Stream<T, Tick<L>, Bounded, O, R>
321where
322    L: Location<'a>,
323{
324    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
325        assert_eq!(
326            Location::id(&self.location),
327            expected_location,
328            "locations do not match"
329        );
330        self.location
331            .flow_state()
332            .borrow_mut()
333            .push_root(HydroRoot::CycleSink {
334                cycle_id,
335                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
336                op_metadata: HydroIrOpMetadata::new(),
337            });
338    }
339}
340
341impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
342    for Stream<T, L, B, O, R>
343where
344    L: Location<'a>,
345{
346    type Location = L;
347
348    fn create_source(cycle_id: CycleId, location: L) -> Self {
349        Stream::new(
350            location.clone(),
351            HydroNode::CycleSource {
352                cycle_id,
353                metadata: location.new_node_metadata(Self::collection_kind()),
354            },
355        )
356    }
357}
358
359impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
360    for Stream<T, L, B, O, R>
361where
362    L: Location<'a>,
363{
364    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
365        assert_eq!(
366            Location::id(&self.location),
367            expected_location,
368            "locations do not match"
369        );
370        self.location
371            .flow_state()
372            .borrow_mut()
373            .push_root(HydroRoot::CycleSink {
374                cycle_id,
375                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
376                op_metadata: HydroIrOpMetadata::new(),
377            });
378    }
379}
380
381impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
382where
383    T: Clone,
384    L: Location<'a>,
385{
386    fn clone(&self) -> Self {
387        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
388            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
389            *self.ir_node.borrow_mut() = HydroNode::Tee {
390                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
391                metadata: self.location.new_node_metadata(Self::collection_kind()),
392            };
393        }
394
395        let HydroNode::Tee { inner, metadata } = &*self.ir_node.borrow() else {
396            unreachable!()
397        };
398        Stream {
399            location: self.location.clone(),
400            flow_state: self.flow_state.clone(),
401            ir_node: HydroNode::Tee {
402                inner: SharedNode(inner.0.clone()),
403                metadata: metadata.clone(),
404            }
405            .into(),
406            _phantom: PhantomData,
407        }
408    }
409}
410
411impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
412where
413    L: Location<'a>,
414{
415    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
416        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
417        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
418
419        let flow_state = location.flow_state().clone();
420        Stream {
421            location,
422            flow_state,
423            ir_node: RefCell::new(ir_node),
424            _phantom: PhantomData,
425        }
426    }
427
428    /// Returns the [`Location`] where this stream is being materialized.
429    pub fn location(&self) -> &L {
430        &self.location
431    }
432
433    /// Creates a shared reference handle to this stream's handoff buffer that can be captured
434    /// inside `q!()` closures. The handle resolves to `&Vec<T>` at runtime.
435    ///
436    /// The stream must be bounded, otherwise reading it would be non-deterministic.
437    pub fn by_ref(&self) -> crate::handoff_ref::StreamRef<'a, '_, T, L>
438    where
439        B: IsBounded,
440    {
441        crate::handoff_ref::StreamRef::new(&self.ir_node)
442    }
443
444    /// Returns a mutable reference handle to this stream's handoff buffer that can be captured
445    /// inside `q!()` closures. The handle resolves to `&mut Vec<T>` at runtime.
446    pub fn by_mut(&self) -> crate::handoff_ref::StreamMut<'a, '_, T, L>
447    where
448        B: IsBounded,
449    {
450        crate::handoff_ref::StreamMut::new(&self.ir_node)
451    }
452
453    /// Weakens the consistency of this live collection to not guarantee any consistency across
454    /// cluster members (if this collection is on a cluster).
455    pub fn weaken_consistency(self) -> Stream<T, L::DropConsistency, B, O, R>
456    where
457        L: Location<'a>,
458    {
459        if L::consistency()
460            .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
461        {
462            // already no consistency
463            Stream::new(
464                self.location.drop_consistency(),
465                self.ir_node.replace(HydroNode::Placeholder),
466            )
467        } else {
468            Stream::new(
469                self.location.drop_consistency(),
470                HydroNode::Cast {
471                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
472                    metadata: self.location.drop_consistency().new_node_metadata(Stream::<
473                        T,
474                        L::DropConsistency,
475                        B,
476                        O,
477                        R,
478                    >::collection_kind(
479                    )),
480                },
481            )
482        }
483    }
484
485    /// Casts this live collection to have the consistency guarantees specified in the given
486    /// location type parameter. The developer must ensure that the strengthened consistency
487    /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
488    pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
489        self,
490        _proof: impl crate::properties::ConsistencyProof,
491    ) -> Stream<T, L2, B, O, R>
492    where
493        L: Location<'a>,
494    {
495        if L::consistency() == L2::consistency() {
496            Stream::new(
497                self.location.with_consistency_of(),
498                self.ir_node.replace(HydroNode::Placeholder),
499            )
500        } else {
501            Stream::new(
502                self.location.with_consistency_of(),
503                HydroNode::AssertIsConsistent {
504                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
505                    trusted: false,
506                    metadata: self
507                        .location
508                        .clone()
509                        .with_consistency_of::<L2>()
510                        .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
511                },
512            )
513        }
514    }
515
516    pub(crate) fn assert_has_consistency_of_trusted<
517        L2: Location<'a, DropConsistency = L::DropConsistency>,
518    >(
519        self,
520        _proof: impl crate::properties::ConsistencyProof,
521    ) -> Stream<T, L2, B, O, R>
522    where
523        L: Location<'a>,
524    {
525        if L::consistency() == L2::consistency() {
526            Stream::new(
527                self.location.with_consistency_of(),
528                self.ir_node.replace(HydroNode::Placeholder),
529            )
530        } else {
531            Stream::new(
532                self.location.with_consistency_of(),
533                HydroNode::AssertIsConsistent {
534                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
535                    trusted: true,
536                    metadata: self
537                        .location
538                        .clone()
539                        .with_consistency_of::<L2>()
540                        .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
541                },
542            )
543        }
544    }
545
546    pub(crate) fn collection_kind() -> CollectionKind {
547        CollectionKind::Stream {
548            bound: B::BOUND_KIND,
549            order: O::ORDERING_KIND,
550            retry: R::RETRIES_KIND,
551            element_type: quote_type::<T>().into(),
552        }
553    }
554
555    /// Produces a stream based on invoking `f` on each element.
556    /// If you do not want to modify the stream and instead only want to view
557    /// each item use [`Stream::inspect`] instead.
558    ///
559    /// # Example
560    /// ```rust
561    /// # #[cfg(feature = "deploy")] {
562    /// # use hydro_lang::prelude::*;
563    /// # use futures::StreamExt;
564    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
565    /// let words = process.source_iter(q!(vec!["hello", "world"]));
566    /// words.map(q!(|x| x.to_uppercase()))
567    /// # }, |mut stream| async move {
568    /// # for w in vec!["HELLO", "WORLD"] {
569    /// #     assert_eq!(stream.next().await.unwrap(), w);
570    /// # }
571    /// # }));
572    /// # }
573    /// ```
574    pub fn map<U, F, C, I, const WAS_MUT: bool>(
575        self,
576        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, I>>,
577    ) -> Stream<U, L, B, O, R>
578    where
579        F: FnMut(T) -> U + 'a,
580        C: ValidMutCommutativityFor<F, T, U, O, WAS_MUT>,
581        I: ValidMutIdempotenceFor<F, T, U, R, WAS_MUT>,
582    {
583        let f = crate::handoff_ref::with_ref_capture(|| {
584            let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
585            proof.register_proof(&expr);
586            expr.into()
587        });
588        Stream::new(
589            self.location.clone(),
590            HydroNode::Map {
591                f,
592                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
593                metadata: self
594                    .location
595                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
596            },
597        )
598    }
599
600    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
601    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
602    /// for the output type `U` must produce items in a **deterministic** order.
603    ///
604    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
605    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
606    ///
607    /// # Example
608    /// ```rust
609    /// # #[cfg(feature = "deploy")] {
610    /// # use hydro_lang::prelude::*;
611    /// # use futures::StreamExt;
612    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
613    /// process
614    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
615    ///     .flat_map_ordered(q!(|x| x))
616    /// # }, |mut stream| async move {
617    /// // 1, 2, 3, 4
618    /// # for w in (1..5) {
619    /// #     assert_eq!(stream.next().await.unwrap(), w);
620    /// # }
621    /// # }));
622    /// # }
623    /// ```
624    pub fn flat_map_ordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
625        self,
626        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
627    ) -> Stream<U, L, B, O, R>
628    where
629        I: IntoIterator<Item = U>,
630        F: FnMut(T) -> I + 'a,
631        C: ValidMutCommutativityFor<F, T, I, O, WAS_MUT>,
632        Idemp: ValidMutIdempotenceFor<F, T, I, R, WAS_MUT>,
633    {
634        let f = crate::handoff_ref::with_ref_capture(|| {
635            let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
636            proof.register_proof(&expr);
637            expr.into()
638        });
639        Stream::new(
640            self.location.clone(),
641            HydroNode::FlatMap {
642                f,
643                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
644                metadata: self
645                    .location
646                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
647            },
648        )
649    }
650
651    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
652    /// for the output type `U` to produce items in any order.
653    ///
654    /// # Example
655    /// ```rust
656    /// # #[cfg(feature = "deploy")] {
657    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
658    /// # use futures::StreamExt;
659    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
660    /// process
661    ///     .source_iter(q!(vec![
662    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
663    ///         std::collections::HashSet::from_iter(vec![3, 4]),
664    ///     ]))
665    ///     .flat_map_unordered(q!(|x| x))
666    /// # }, |mut stream| async move {
667    /// // 1, 2, 3, 4, but in no particular order
668    /// # let mut results = Vec::new();
669    /// # for w in (1..5) {
670    /// #     results.push(stream.next().await.unwrap());
671    /// # }
672    /// # results.sort();
673    /// # assert_eq!(results, vec![1, 2, 3, 4]);
674    /// # }));
675    /// # }
676    /// ```
677    pub fn flat_map_unordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
678        self,
679        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
680    ) -> Stream<U, L, B, NoOrder, R>
681    where
682        I: IntoIterator<Item = U>,
683        F: FnMut(T) -> I + 'a,
684        C: ValidMutCommutativityFor<F, T, I, O, WAS_MUT>,
685        Idemp: ValidMutIdempotenceFor<F, T, I, R, WAS_MUT>,
686    {
687        let f = crate::handoff_ref::with_ref_capture(|| {
688            let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
689            proof.register_proof(&expr);
690            expr.into()
691        });
692        Stream::new(
693            self.location.clone(),
694            HydroNode::FlatMap {
695                f,
696                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
697                metadata: self
698                    .location
699                    .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
700            },
701        )
702    }
703
704    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
705    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
706    ///
707    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
708    /// not deterministic, use [`Stream::flatten_unordered`] instead.
709    ///
710    /// ```rust
711    /// # #[cfg(feature = "deploy")] {
712    /// # use hydro_lang::prelude::*;
713    /// # use futures::StreamExt;
714    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
715    /// process
716    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
717    ///     .flatten_ordered()
718    /// # }, |mut stream| async move {
719    /// // 1, 2, 3, 4
720    /// # for w in (1..5) {
721    /// #     assert_eq!(stream.next().await.unwrap(), w);
722    /// # }
723    /// # }));
724    /// # }
725    /// ```
726    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
727    where
728        T: IntoIterator<Item = U>,
729    {
730        self.flat_map_ordered(q!(|d| d))
731    }
732
733    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
734    /// for the element type `T` to produce items in any order.
735    ///
736    /// # Example
737    /// ```rust
738    /// # #[cfg(feature = "deploy")] {
739    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
740    /// # use futures::StreamExt;
741    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
742    /// process
743    ///     .source_iter(q!(vec![
744    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
745    ///         std::collections::HashSet::from_iter(vec![3, 4]),
746    ///     ]))
747    ///     .flatten_unordered()
748    /// # }, |mut stream| async move {
749    /// // 1, 2, 3, 4, but in no particular order
750    /// # let mut results = Vec::new();
751    /// # for w in (1..5) {
752    /// #     results.push(stream.next().await.unwrap());
753    /// # }
754    /// # results.sort();
755    /// # assert_eq!(results, vec![1, 2, 3, 4]);
756    /// # }));
757    /// # }
758    /// ```
759    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
760    where
761        T: IntoIterator<Item = U>,
762    {
763        self.flat_map_unordered(q!(|d| d))
764    }
765
766    /// For each item in the input stream, apply `f` to produce a [`futures::stream::Stream`],
767    /// then emit the elements of that stream one by one. When the inner stream yields
768    /// `Pending`, this operator yields as well.
769    pub fn flat_map_stream_blocking<U, S, F, C, Idemp, const WAS_MUT: bool>(
770        self,
771        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
772    ) -> Stream<U, L, B, O, R>
773    where
774        S: futures::Stream<Item = U>,
775        F: FnMut(T) -> S + 'a,
776        C: ValidMutCommutativityFor<F, T, S, O, WAS_MUT>,
777        Idemp: ValidMutIdempotenceFor<F, T, S, R, WAS_MUT>,
778    {
779        let f = crate::handoff_ref::with_ref_capture(|| {
780            let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
781            proof.register_proof(&expr);
782            expr.into()
783        });
784        Stream::new(
785            self.location.clone(),
786            HydroNode::FlatMapStreamBlocking {
787                f,
788                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
789                metadata: self
790                    .location
791                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
792            },
793        )
794    }
795
796    /// For each item in the input stream, treat it as a [`futures::stream::Stream`] and
797    /// emit its elements one by one. When the inner stream yields `Pending`, this operator
798    /// yields as well.
799    pub fn flatten_stream_blocking<U>(self) -> Stream<U, L, B, O, R>
800    where
801        T: futures::Stream<Item = U>,
802    {
803        self.flat_map_stream_blocking(q!(|d| d))
804    }
805
806    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
807    /// `f`, preserving the order of the elements.
808    ///
809    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
810    /// not modify or take ownership of the values. If you need to modify the values while filtering
811    /// use [`Stream::filter_map`] instead.
812    ///
813    /// # Example
814    /// ```rust
815    /// # #[cfg(feature = "deploy")] {
816    /// # use hydro_lang::prelude::*;
817    /// # use futures::StreamExt;
818    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
819    /// process
820    ///     .source_iter(q!(vec![1, 2, 3, 4]))
821    ///     .filter(q!(|&x| x > 2))
822    /// # }, |mut stream| async move {
823    /// // 3, 4
824    /// # for w in (3..5) {
825    /// #     assert_eq!(stream.next().await.unwrap(), w);
826    /// # }
827    /// # }));
828    /// # }
829    /// ```
830    pub fn filter<F, C, Idemp, const WAS_MUT: bool>(
831        self,
832        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
833    ) -> Self
834    where
835        F: FnMut(&T) -> bool + 'a,
836        C: ValidMutBorrowCommutativityFor<F, T, bool, O, WAS_MUT>,
837        Idemp: ValidMutBorrowIdempotenceFor<F, T, bool, R, WAS_MUT>,
838    {
839        let f = crate::handoff_ref::with_ref_capture(|| {
840            let (expr, proof) = f.splice_fnmut1_borrow_ctx_props(&self.location);
841            proof.register_proof(&expr);
842            expr.into()
843        });
844        Stream::new(
845            self.location.clone(),
846            HydroNode::Filter {
847                f,
848                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
849                metadata: self.location.new_node_metadata(Self::collection_kind()),
850            },
851        )
852    }
853
854    /// Splits the stream into two streams based on a predicate, without cloning elements.
855    ///
856    /// Elements for which `f` returns `true` are sent to the first output stream,
857    /// and elements for which `f` returns `false` are sent to the second output stream.
858    ///
859    /// Unlike using `filter` twice, this only evaluates the predicate once per element
860    /// and does not require `T: Clone`.
861    ///
862    /// The closure `f` receives a reference `&T` rather than an owned value `T` because
863    /// the predicate is only used for routing; the element itself is moved to the
864    /// appropriate output stream.
865    ///
866    /// # Example
867    /// ```rust
868    /// # #[cfg(feature = "deploy")] {
869    /// # use hydro_lang::prelude::*;
870    /// # use hydro_lang::live_collections::stream::{NoOrder, ExactlyOnce};
871    /// # use futures::StreamExt;
872    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
873    /// let numbers: Stream<_, _, Unbounded> = process.source_iter(q!(vec![1, 2, 3, 4, 5, 6])).into();
874    /// let (evens, odds) = numbers.partition(q!(|&x| x % 2 == 0));
875    /// // evens: 2, 4, 6 tagged with true; odds: 1, 3, 5 tagged with false
876    /// evens.map(q!(|x| (x, true)))
877    ///     .merge_unordered(odds.map(q!(|x| (x, false))))
878    /// # }, |mut stream| async move {
879    /// # let mut results = Vec::new();
880    /// # for _ in 0..6 {
881    /// #     results.push(stream.next().await.unwrap());
882    /// # }
883    /// # results.sort();
884    /// # assert_eq!(results, vec![(1, false), (2, true), (3, false), (4, true), (5, false), (6, true)]);
885    /// # }));
886    /// # }
887    /// ```
888    #[expect(
889        clippy::type_complexity,
890        reason = "return type mirrors the input stream type"
891    )]
892    pub fn partition<F, C, Idemp, const WAS_MUT: bool>(
893        self,
894        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
895    ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
896    where
897        F: FnMut(&T) -> bool + 'a,
898        C: ValidMutBorrowCommutativityFor<F, T, bool, O, WAS_MUT>,
899        Idemp: ValidMutBorrowIdempotenceFor<F, T, bool, R, WAS_MUT>,
900    {
901        let f = crate::handoff_ref::with_ref_capture(|| {
902            let (expr, proof) = f.splice_fnmut1_borrow_ctx_props(&self.location);
903            proof.register_proof(&expr);
904            expr.into()
905        });
906        let shared = SharedNode(Rc::new(RefCell::new(
907            self.ir_node.replace(HydroNode::Placeholder),
908        )));
909
910        let true_stream = Stream::new(
911            self.location.clone(),
912            HydroNode::Partition {
913                inner: SharedNode(shared.0.clone()),
914                f: f.clone(),
915                is_true: true,
916                metadata: self.location.new_node_metadata(Self::collection_kind()),
917            },
918        );
919
920        let false_stream = Stream::new(
921            self.location.clone(),
922            HydroNode::Partition {
923                inner: SharedNode(shared.0),
924                f,
925                is_true: false,
926                metadata: self.location.new_node_metadata(Self::collection_kind()),
927            },
928        );
929
930        (true_stream, false_stream)
931    }
932
933    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
934    ///
935    /// # Example
936    /// ```rust
937    /// # #[cfg(feature = "deploy")] {
938    /// # use hydro_lang::prelude::*;
939    /// # use futures::StreamExt;
940    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
941    /// process
942    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
943    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
944    /// # }, |mut stream| async move {
945    /// // 1, 2
946    /// # for w in (1..3) {
947    /// #     assert_eq!(stream.next().await.unwrap(), w);
948    /// # }
949    /// # }));
950    /// # }
951    /// ```
952    pub fn filter_map<U, F, C, Idemp, const WAS_MUT: bool>(
953        self,
954        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
955    ) -> Stream<U, L, B, O, R>
956    where
957        F: FnMut(T) -> Option<U> + 'a,
958        C: ValidMutCommutativityFor<F, T, Option<U>, O, WAS_MUT>,
959        Idemp: ValidMutIdempotenceFor<F, T, Option<U>, R, WAS_MUT>,
960    {
961        let f = crate::handoff_ref::with_ref_capture(|| {
962            let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
963            proof.register_proof(&expr);
964            expr.into()
965        });
966        Stream::new(
967            self.location.clone(),
968            HydroNode::FilterMap {
969                f,
970                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
971                metadata: self
972                    .location
973                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
974            },
975        )
976    }
977
978    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
979    /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
980    /// If `other` is an empty [`Optional`], no values will be produced.
981    ///
982    /// # Example
983    /// ```rust
984    /// # #[cfg(feature = "deploy")] {
985    /// # use hydro_lang::prelude::*;
986    /// # use futures::StreamExt;
987    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
988    /// let tick = process.tick();
989    /// let batch = process
990    ///   .source_iter(q!(vec![1, 2, 3, 4]))
991    ///   .batch(&tick, nondet!(/** test */));
992    /// let count = batch.clone().count(); // `count()` returns a singleton
993    /// batch.cross_singleton(count).all_ticks()
994    /// # }, |mut stream| async move {
995    /// // (1, 4), (2, 4), (3, 4), (4, 4)
996    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
997    /// #     assert_eq!(stream.next().await.unwrap(), w);
998    /// # }
999    /// # }));
1000    /// # }
1001    /// ```
1002    pub fn cross_singleton<O2>(
1003        self,
1004        other: impl Into<Optional<O2, L, Bounded>>,
1005    ) -> Stream<(T, O2), L, B, O, R>
1006    where
1007        O2: Clone,
1008    {
1009        let other: Optional<O2, L, Bounded> = other.into();
1010        check_matching_location(&self.location, &other.location);
1011
1012        Stream::new(
1013            self.location.clone(),
1014            HydroNode::CrossSingleton {
1015                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1016                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1017                metadata: self
1018                    .location
1019                    .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
1020            },
1021        )
1022    }
1023
1024    /// Passes this stream through if the boolean signal is `true`, otherwise the output is empty.
1025    ///
1026    /// # Example
1027    /// ```rust
1028    /// # #[cfg(feature = "deploy")] {
1029    /// # use hydro_lang::prelude::*;
1030    /// # use futures::StreamExt;
1031    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1032    /// let tick = process.tick();
1033    /// // ticks are lazy by default, forces the second tick to run
1034    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1035    ///
1036    /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
1037    /// let batch_first_tick = process
1038    ///   .source_iter(q!(vec![1, 2, 3, 4]))
1039    ///   .batch(&tick, nondet!(/** test */));
1040    /// let batch_second_tick = process
1041    ///   .source_iter(q!(vec![5, 6, 7, 8]))
1042    ///   .batch(&tick, nondet!(/** test */))
1043    ///   .defer_tick();
1044    /// batch_first_tick.chain(batch_second_tick)
1045    ///   .filter_if(signal)
1046    ///   .all_ticks()
1047    /// # }, |mut stream| async move {
1048    /// // [1, 2, 3, 4]
1049    /// # for w in vec![1, 2, 3, 4] {
1050    /// #     assert_eq!(stream.next().await.unwrap(), w);
1051    /// # }
1052    /// # }));
1053    /// # }
1054    /// ```
1055    pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
1056        self.cross_singleton(signal.filter(q!(|b| *b)))
1057            .map(q!(|(d, _)| d))
1058    }
1059
1060    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
1061    ///
1062    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
1063    /// leader of a cluster.
1064    ///
1065    /// # Example
1066    /// ```rust
1067    /// # #[cfg(feature = "deploy")] {
1068    /// # use hydro_lang::prelude::*;
1069    /// # use futures::StreamExt;
1070    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1071    /// let tick = process.tick();
1072    /// // ticks are lazy by default, forces the second tick to run
1073    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1074    ///
1075    /// let batch_first_tick = process
1076    ///   .source_iter(q!(vec![1, 2, 3, 4]))
1077    ///   .batch(&tick, nondet!(/** test */));
1078    /// let batch_second_tick = process
1079    ///   .source_iter(q!(vec![5, 6, 7, 8]))
1080    ///   .batch(&tick, nondet!(/** test */))
1081    ///   .defer_tick(); // appears on the second tick
1082    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1083    /// batch_first_tick.chain(batch_second_tick)
1084    ///   .filter_if_some(some_on_first_tick)
1085    ///   .all_ticks()
1086    /// # }, |mut stream| async move {
1087    /// // [1, 2, 3, 4]
1088    /// # for w in vec![1, 2, 3, 4] {
1089    /// #     assert_eq!(stream.next().await.unwrap(), w);
1090    /// # }
1091    /// # }));
1092    /// # }
1093    /// ```
1094    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1095    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1096        self.filter_if(signal.is_some())
1097    }
1098
1099    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
1100    ///
1101    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
1102    /// some local state.
1103    ///
1104    /// # Example
1105    /// ```rust
1106    /// # #[cfg(feature = "deploy")] {
1107    /// # use hydro_lang::prelude::*;
1108    /// # use futures::StreamExt;
1109    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1110    /// let tick = process.tick();
1111    /// // ticks are lazy by default, forces the second tick to run
1112    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1113    ///
1114    /// let batch_first_tick = process
1115    ///   .source_iter(q!(vec![1, 2, 3, 4]))
1116    ///   .batch(&tick, nondet!(/** test */));
1117    /// let batch_second_tick = process
1118    ///   .source_iter(q!(vec![5, 6, 7, 8]))
1119    ///   .batch(&tick, nondet!(/** test */))
1120    ///   .defer_tick(); // appears on the second tick
1121    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1122    /// batch_first_tick.chain(batch_second_tick)
1123    ///   .filter_if_none(some_on_first_tick)
1124    ///   .all_ticks()
1125    /// # }, |mut stream| async move {
1126    /// // [5, 6, 7, 8]
1127    /// # for w in vec![5, 6, 7, 8] {
1128    /// #     assert_eq!(stream.next().await.unwrap(), w);
1129    /// # }
1130    /// # }));
1131    /// # }
1132    /// ```
1133    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1134    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1135        self.filter_if(other.is_none())
1136    }
1137
1138    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams,
1139    /// returning all tupled pairs.
1140    ///
1141    /// When the right side is [`Bounded`], it is accumulated first and the left side streams
1142    /// through, preserving the left side's ordering. When both sides are [`Unbounded`], a
1143    /// symmetric hash join is used and ordering is [`NoOrder`].
1144    ///
1145    /// # Example
1146    /// ```rust
1147    /// # #[cfg(feature = "deploy")] {
1148    /// # use hydro_lang::prelude::*;
1149    /// # use std::collections::HashSet;
1150    /// # use futures::StreamExt;
1151    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1152    /// let tick = process.tick();
1153    /// let stream1 = process.source_iter(q!(vec![1, 2]));
1154    /// let stream2 = process.source_iter(q!(vec!['a', 'b']));
1155    /// stream1.cross_product(stream2)
1156    /// # }, |mut stream| async move {
1157    /// // (1, 'a'), (1, 'b'), (2, 'a'), (2, 'b') in any order
1158    /// # let expected = HashSet::from([(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]);
1159    /// # stream.map(|i| assert!(expected.contains(&i)));
1160    /// # }));
1161    /// # }
1162    #[expect(
1163        clippy::type_complexity,
1164        reason = "MinRetries projection in return type"
1165    )]
1166    pub fn cross_product<T2, B2: Boundedness, O2: Ordering, R2: Retries>(
1167        self,
1168        other: Stream<T2, L, B2, O2, R2>,
1169    ) -> Stream<(T, T2), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
1170    where
1171        T: Clone,
1172        T2: Clone,
1173        R: MinRetries<R2>,
1174    {
1175        self.map(q!(|v| ((), v)))
1176            .join(other.map(q!(|v| ((), v))))
1177            .map(q!(|((), (v1, v2))| (v1, v2)))
1178    }
1179
1180    /// Takes one stream as input and filters out any duplicate occurrences. The output
1181    /// contains all unique values from the input.
1182    ///
1183    /// # Example
1184    /// ```rust
1185    /// # #[cfg(feature = "deploy")] {
1186    /// # use hydro_lang::prelude::*;
1187    /// # use futures::StreamExt;
1188    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1189    /// let tick = process.tick();
1190    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
1191    /// # }, |mut stream| async move {
1192    /// # for w in vec![1, 2, 3, 4] {
1193    /// #     assert_eq!(stream.next().await.unwrap(), w);
1194    /// # }
1195    /// # }));
1196    /// # }
1197    /// ```
1198    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
1199    where
1200        T: Eq + Hash,
1201    {
1202        Stream::new(
1203            self.location.clone(),
1204            HydroNode::Unique {
1205                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1206                metadata: self
1207                    .location
1208                    .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1209            },
1210        )
1211    }
1212
1213    /// Outputs everything in this stream that is *not* contained in the `other` stream.
1214    ///
1215    /// The `other` stream must be [`Bounded`], since this function will wait until
1216    /// all its elements are available before producing any output.
1217    /// # Example
1218    /// ```rust
1219    /// # #[cfg(feature = "deploy")] {
1220    /// # use hydro_lang::prelude::*;
1221    /// # use futures::StreamExt;
1222    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1223    /// let tick = process.tick();
1224    /// let stream = process
1225    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
1226    ///   .batch(&tick, nondet!(/** test */));
1227    /// let batch = process
1228    ///   .source_iter(q!(vec![1, 2]))
1229    ///   .batch(&tick, nondet!(/** test */));
1230    /// stream.filter_not_in(batch).all_ticks()
1231    /// # }, |mut stream| async move {
1232    /// # for w in vec![3, 4] {
1233    /// #     assert_eq!(stream.next().await.unwrap(), w);
1234    /// # }
1235    /// # }));
1236    /// # }
1237    /// ```
1238    pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1239    where
1240        T: Eq + Hash,
1241        B2: IsBounded,
1242    {
1243        check_matching_location(&self.location, &other.location);
1244
1245        Stream::new(
1246            self.location.clone(),
1247            HydroNode::Difference {
1248                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1249                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1250                metadata: self
1251                    .location
1252                    .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1253            },
1254        )
1255    }
1256
1257    /// An operator which allows you to "inspect" each element of a stream without
1258    /// modifying it. The closure `f` is called on a reference to each item. This is
1259    /// mainly useful for debugging, and should not be used to generate side-effects.
1260    ///
1261    /// # Example
1262    /// ```rust
1263    /// # #[cfg(feature = "deploy")] {
1264    /// # use hydro_lang::prelude::*;
1265    /// # use futures::StreamExt;
1266    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1267    /// let nums = process.source_iter(q!(vec![1, 2]));
1268    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
1269    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
1270    /// # }, |mut stream| async move {
1271    /// # for w in vec![1, 2] {
1272    /// #     assert_eq!(stream.next().await.unwrap(), w);
1273    /// # }
1274    /// # }));
1275    /// # }
1276    /// ```
1277    pub fn inspect<F, C, Idemp, const WAS_MUT: bool>(
1278        self,
1279        f: impl IntoQuotedMut<'a, F, L::DropConsistency, StreamMapFuncAlgebra<C, Idemp>>,
1280    ) -> Self
1281    where
1282        F: FnMut(&T) + 'a,
1283        C: ValidMutBorrowCommutativityFor<F, T, (), O, WAS_MUT>,
1284        Idemp: ValidMutBorrowIdempotenceFor<F, T, (), R, WAS_MUT>,
1285    {
1286        let f = crate::handoff_ref::with_ref_capture(|| {
1287            let (expr, proof) = f.splice_fnmut1_borrow_ctx_props(&self.location.drop_consistency());
1288            proof.register_proof(&expr);
1289            expr.into()
1290        });
1291
1292        Stream::new(
1293            self.location.clone(),
1294            HydroNode::Inspect {
1295                f,
1296                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1297                metadata: self.location.new_node_metadata(Self::collection_kind()),
1298            },
1299        )
1300    }
1301
1302    /// Executes the provided closure for every element in this stream.
1303    ///
1304    /// Because the closure may have side effects, the stream must have deterministic order
1305    /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
1306    /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
1307    /// [`Stream::assume_retries`] with an explanation for why this is the case.
1308    ///
1309    /// The closure may capture singletons via `by_ref()` or `by_mut()`. No commutativity
1310    /// or idempotence proofs are needed because the `TotalOrder + ExactlyOnce` requirements
1311    /// already guarantee deterministic execution.
1312    pub fn for_each<F: FnMut(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
1313    where
1314        O: IsOrdered,
1315        R: IsExactlyOnce,
1316    {
1317        let f = crate::handoff_ref::with_ref_capture(|| f.splice_fnmut1_ctx(&self.location).into());
1318        self.location
1319            .flow_state()
1320            .borrow_mut()
1321            .push_root(HydroRoot::ForEach {
1322                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1323                f,
1324                op_metadata: HydroIrOpMetadata::new(),
1325            });
1326    }
1327
1328    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
1329    /// TCP socket to some other server. You should _not_ use this API for interacting with
1330    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
1331    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
1332    /// interaction with asynchronous sinks.
1333    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1334    where
1335        O: IsOrdered,
1336        R: IsExactlyOnce,
1337        S: 'a + futures::Sink<T> + Unpin,
1338    {
1339        self.location
1340            .flow_state()
1341            .borrow_mut()
1342            .push_root(HydroRoot::DestSink {
1343                sink: sink.splice_typed_ctx(&self.location).into(),
1344                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1345                op_metadata: HydroIrOpMetadata::new(),
1346            });
1347    }
1348
1349    /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1350    ///
1351    /// # Example
1352    /// ```rust
1353    /// # #[cfg(feature = "deploy")] {
1354    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1355    /// # use futures::StreamExt;
1356    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1357    /// let tick = process.tick();
1358    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1359    /// numbers.enumerate()
1360    /// # }, |mut stream| async move {
1361    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1362    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1363    /// #     assert_eq!(stream.next().await.unwrap(), w);
1364    /// # }
1365    /// # }));
1366    /// # }
1367    /// ```
1368    pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1369    where
1370        O: IsOrdered,
1371        R: IsExactlyOnce,
1372    {
1373        Stream::new(
1374            self.location.clone(),
1375            HydroNode::Enumerate {
1376                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1377                metadata: self.location.new_node_metadata(Stream::<
1378                    (usize, T),
1379                    L,
1380                    B,
1381                    TotalOrder,
1382                    ExactlyOnce,
1383                >::collection_kind()),
1384            },
1385        )
1386    }
1387
1388    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1389    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1390    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1391    ///
1392    /// Depending on the input stream guarantees, the closure may need to be commutative
1393    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1394    ///
1395    /// # Example
1396    /// ```rust
1397    /// # #[cfg(feature = "deploy")] {
1398    /// # use hydro_lang::prelude::*;
1399    /// # use futures::StreamExt;
1400    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1401    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1402    /// words
1403    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1404    ///     .into_stream()
1405    /// # }, |mut stream| async move {
1406    /// // "HELLOWORLD"
1407    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1408    /// # }));
1409    /// # }
1410    /// ```
1411    pub fn fold<A, I, F, C, Idemp, M, B2: SingletonBound>(
1412        self,
1413        init: impl IntoQuotedMut<'a, I, L>,
1414        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1415    ) -> Singleton<A, L, B2>
1416    where
1417        I: Fn() -> A + 'a,
1418        F: 'a + Fn(&mut A, T),
1419        C: ValidCommutativityFor<O>,
1420        Idemp: ValidIdempotenceFor<R>,
1421        B: ApplyMonotoneStream<M, B2>,
1422    {
1423        let init = init.splice_fn0_ctx(&self.location).into();
1424        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1425        proof.register_proof(&comb);
1426
1427        // Only assume_retries (for idempotence), not assume_ordering.
1428        // The fold hook in the simulator handles ordering non-determinism directly.
1429        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1430        let retried: Stream<T, L::DropConsistency, B, O, ExactlyOnce> = self.assume_retries(nondet);
1431
1432        let core = HydroNode::Fold {
1433            init,
1434            acc: comb.into(),
1435            input: Box::new(retried.ir_node.replace(HydroNode::Placeholder)),
1436            metadata: retried
1437                .location
1438                .new_node_metadata(Singleton::<A, L::DropConsistency, B2>::collection_kind()),
1439            // we do not guarantee consistency at this point because if the algebraic properties
1440            // do not hold in practice, replica consistency may fail to be maintained, so we
1441            // would like the simulator to assert consistency; in the future, this will be dynamic
1442            // based on the proof mechanism
1443        };
1444
1445        Singleton::new(retried.location.clone(), core)
1446            .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
1447    }
1448
1449    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1450    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1451    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1452    /// reference, so that it can be modified in place.
1453    ///
1454    /// Depending on the input stream guarantees, the closure may need to be commutative
1455    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1456    ///
1457    /// # Example
1458    /// ```rust
1459    /// # #[cfg(feature = "deploy")] {
1460    /// # use hydro_lang::prelude::*;
1461    /// # use futures::StreamExt;
1462    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1463    /// let bools = process.source_iter(q!(vec![false, true, false]));
1464    /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1465    /// # }, |mut stream| async move {
1466    /// // true
1467    /// # assert_eq!(stream.next().await.unwrap(), true);
1468    /// # }));
1469    /// # }
1470    /// ```
1471    pub fn reduce<F, C, Idemp>(
1472        self,
1473        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1474    ) -> Optional<T, L, B>
1475    where
1476        F: Fn(&mut T, T) + 'a,
1477        C: ValidCommutativityFor<O>,
1478        Idemp: ValidIdempotenceFor<R>,
1479    {
1480        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1481        proof.register_proof(&f);
1482
1483        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1484        let ordered_etc: Stream<T, L::DropConsistency, B> =
1485            self.assume_retries(nondet).assume_ordering(nondet);
1486
1487        let core = HydroNode::Reduce {
1488            f: f.into(),
1489            input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1490            metadata: ordered_etc
1491                .location
1492                .new_node_metadata(Optional::<T, L::DropConsistency, B>::collection_kind()),
1493        };
1494
1495        Optional::new(ordered_etc.location.clone(), core)
1496            .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
1497    }
1498
1499    /// Computes the maximum element in the stream as an [`Optional`], which
1500    /// will be empty until the first element in the input arrives.
1501    ///
1502    /// # Example
1503    /// ```rust
1504    /// # #[cfg(feature = "deploy")] {
1505    /// # use hydro_lang::prelude::*;
1506    /// # use futures::StreamExt;
1507    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1508    /// let tick = process.tick();
1509    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1510    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1511    /// batch.max().all_ticks()
1512    /// # }, |mut stream| async move {
1513    /// // 4
1514    /// # assert_eq!(stream.next().await.unwrap(), 4);
1515    /// # }));
1516    /// # }
1517    /// ```
1518    pub fn max(self) -> Optional<T, L, B>
1519    where
1520        T: Ord,
1521    {
1522        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1523            .assume_ordering_trusted_bounded::<TotalOrder>(
1524                nondet!(/** max is commutative, but order affects intermediates */),
1525            )
1526            .reduce(q!(|curr, new| {
1527                if new > *curr {
1528                    *curr = new;
1529                }
1530            }))
1531    }
1532
1533    /// Computes the minimum element in the stream as an [`Optional`], which
1534    /// will be empty until the first element in the input arrives.
1535    ///
1536    /// # Example
1537    /// ```rust
1538    /// # #[cfg(feature = "deploy")] {
1539    /// # use hydro_lang::prelude::*;
1540    /// # use futures::StreamExt;
1541    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1542    /// let tick = process.tick();
1543    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1544    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1545    /// batch.min().all_ticks()
1546    /// # }, |mut stream| async move {
1547    /// // 1
1548    /// # assert_eq!(stream.next().await.unwrap(), 1);
1549    /// # }));
1550    /// # }
1551    /// ```
1552    pub fn min(self) -> Optional<T, L, B>
1553    where
1554        T: Ord,
1555    {
1556        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1557            .assume_ordering_trusted_bounded::<TotalOrder>(
1558                nondet!(/** max is commutative, but order affects intermediates */),
1559            )
1560            .reduce(q!(|curr, new| {
1561                if new < *curr {
1562                    *curr = new;
1563                }
1564            }))
1565    }
1566
1567    /// Computes the first element in the stream as an [`Optional`], which
1568    /// will be empty until the first element in the input arrives.
1569    ///
1570    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1571    /// re-ordering of elements may cause the first element to change.
1572    ///
1573    /// # Example
1574    /// ```rust
1575    /// # #[cfg(feature = "deploy")] {
1576    /// # use hydro_lang::prelude::*;
1577    /// # use futures::StreamExt;
1578    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1579    /// let tick = process.tick();
1580    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1581    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1582    /// batch.first().all_ticks()
1583    /// # }, |mut stream| async move {
1584    /// // 1
1585    /// # assert_eq!(stream.next().await.unwrap(), 1);
1586    /// # }));
1587    /// # }
1588    /// ```
1589    pub fn first(self) -> Optional<T, L, B>
1590    where
1591        O: IsOrdered,
1592    {
1593        self.make_totally_ordered()
1594            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1595            .generator(q!(|| ()), q!(|_, item| Generate::Return(item)))
1596            .reduce(q!(|_, _| {}))
1597    }
1598
1599    /// Computes the last element in the stream as an [`Optional`], which
1600    /// will be empty until an element in the input arrives.
1601    ///
1602    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1603    /// re-ordering of elements may cause the last element to change.
1604    ///
1605    /// # Example
1606    /// ```rust
1607    /// # #[cfg(feature = "deploy")] {
1608    /// # use hydro_lang::prelude::*;
1609    /// # use futures::StreamExt;
1610    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1611    /// let tick = process.tick();
1612    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1613    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1614    /// batch.last().all_ticks()
1615    /// # }, |mut stream| async move {
1616    /// // 4
1617    /// # assert_eq!(stream.next().await.unwrap(), 4);
1618    /// # }));
1619    /// # }
1620    /// ```
1621    pub fn last(self) -> Optional<T, L, B>
1622    where
1623        O: IsOrdered,
1624    {
1625        self.make_totally_ordered()
1626            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1627            .reduce(q!(|curr, new| *curr = new))
1628    }
1629
1630    /// Returns a stream containing at most the first `n` elements of the input stream,
1631    /// preserving the original order. Similar to `LIMIT` in SQL.
1632    ///
1633    /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1634    /// retries, since the result depends on the order and cardinality of elements.
1635    ///
1636    /// # Example
1637    /// ```rust
1638    /// # #[cfg(feature = "deploy")] {
1639    /// # use hydro_lang::prelude::*;
1640    /// # use futures::StreamExt;
1641    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1642    /// let numbers = process.source_iter(q!(vec![10, 20, 30, 40, 50]));
1643    /// numbers.limit(q!(3))
1644    /// # }, |mut stream| async move {
1645    /// // 10, 20, 30
1646    /// # for w in vec![10, 20, 30] {
1647    /// #     assert_eq!(stream.next().await.unwrap(), w);
1648    /// # }
1649    /// # }));
1650    /// # }
1651    /// ```
1652    pub fn limit(
1653        self,
1654        n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1655    ) -> Stream<T, L, B, TotalOrder, ExactlyOnce>
1656    where
1657        O: IsOrdered,
1658        R: IsExactlyOnce,
1659    {
1660        self.generator(
1661            q!(|| 0usize),
1662            q!(move |count, item| {
1663                if *count == n {
1664                    Generate::Break
1665                } else {
1666                    *count += 1;
1667                    if *count == n {
1668                        Generate::Return(item)
1669                    } else {
1670                        Generate::Yield(item)
1671                    }
1672                }
1673            }),
1674        )
1675    }
1676
1677    /// Collects all the elements of this stream into a single [`Vec`] element.
1678    ///
1679    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1680    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1681    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1682    /// the vector at an arbitrary point in time.
1683    ///
1684    /// # Example
1685    /// ```rust
1686    /// # #[cfg(feature = "deploy")] {
1687    /// # use hydro_lang::prelude::*;
1688    /// # use futures::StreamExt;
1689    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1690    /// let tick = process.tick();
1691    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1692    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1693    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1694    /// # }, |mut stream| async move {
1695    /// // [ vec![1, 2, 3, 4] ]
1696    /// # for w in vec![vec![1, 2, 3, 4]] {
1697    /// #     assert_eq!(stream.next().await.unwrap(), w);
1698    /// # }
1699    /// # }));
1700    /// # }
1701    /// ```
1702    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1703    where
1704        O: IsOrdered,
1705        R: IsExactlyOnce,
1706    {
1707        self.make_totally_ordered().make_exactly_once().fold(
1708            q!(|| vec![]),
1709            q!(|acc, v| {
1710                acc.push(v);
1711            }),
1712        )
1713    }
1714
1715    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1716    /// and emitting each intermediate result.
1717    ///
1718    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1719    /// containing all intermediate accumulated values. The scan operation can also terminate early
1720    /// by returning `None`.
1721    ///
1722    /// The function takes a mutable reference to the accumulator and the current element, and returns
1723    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1724    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1725    ///
1726    /// # Examples
1727    ///
1728    /// Basic usage - running sum:
1729    /// ```rust
1730    /// # #[cfg(feature = "deploy")] {
1731    /// # use hydro_lang::prelude::*;
1732    /// # use futures::StreamExt;
1733    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1734    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1735    ///     q!(|| 0),
1736    ///     q!(|acc, x| {
1737    ///         *acc += x;
1738    ///         Some(*acc)
1739    ///     }),
1740    /// )
1741    /// # }, |mut stream| async move {
1742    /// // Output: 1, 3, 6, 10
1743    /// # for w in vec![1, 3, 6, 10] {
1744    /// #     assert_eq!(stream.next().await.unwrap(), w);
1745    /// # }
1746    /// # }));
1747    /// # }
1748    /// ```
1749    ///
1750    /// Early termination example:
1751    /// ```rust
1752    /// # #[cfg(feature = "deploy")] {
1753    /// # use hydro_lang::prelude::*;
1754    /// # use futures::StreamExt;
1755    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1756    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1757    ///     q!(|| 1),
1758    ///     q!(|state, x| {
1759    ///         *state = *state * x;
1760    ///         if *state > 6 {
1761    ///             None // Terminate the stream
1762    ///         } else {
1763    ///             Some(-*state)
1764    ///         }
1765    ///     }),
1766    /// )
1767    /// # }, |mut stream| async move {
1768    /// // Output: -1, -2, -6
1769    /// # for w in vec![-1, -2, -6] {
1770    /// #     assert_eq!(stream.next().await.unwrap(), w);
1771    /// # }
1772    /// # }));
1773    /// # }
1774    /// ```
1775    pub fn scan<A, U, I, F>(
1776        self,
1777        init: impl IntoQuotedMut<'a, I, L>,
1778        f: impl IntoQuotedMut<'a, F, L>,
1779    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1780    where
1781        O: IsOrdered,
1782        R: IsExactlyOnce,
1783        I: Fn() -> A + 'a,
1784        F: Fn(&mut A, T) -> Option<U> + 'a,
1785    {
1786        let init = init.splice_fn0_ctx(&self.location).into();
1787        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1788
1789        Stream::new(
1790            self.location.clone(),
1791            HydroNode::Scan {
1792                init,
1793                acc: f,
1794                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1795                metadata: self.location.new_node_metadata(
1796                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1797                ),
1798            },
1799        )
1800    }
1801
1802    /// Async version of [`Stream::scan`]. Applies an async function to each element of the
1803    /// stream, maintaining an internal state (accumulator) and emitting the values returned
1804    /// by the function.
1805    ///
1806    /// The closure runs synchronously (so it can mutate the accumulator), then returns a
1807    /// future. The future is polled to completion. If it resolves to `Some`, the value is
1808    /// emitted. If it resolves to `None`, the item is filtered out.
1809    ///
1810    /// # Examples
1811    ///
1812    /// ```rust
1813    /// # #[cfg(feature = "deploy")] {
1814    /// # use hydro_lang::prelude::*;
1815    /// # use futures::StreamExt;
1816    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1817    /// process
1818    ///     .source_iter(q!(vec![1, 2, 3, 4]))
1819    ///     .scan_async_blocking(
1820    ///         q!(|| 0),
1821    ///         q!(|acc, x| {
1822    ///             *acc += x;
1823    ///             let val = *acc;
1824    ///             async move { Some(val) }
1825    ///         }),
1826    ///     )
1827    /// # }, |mut stream| async move {
1828    /// // Output: 1, 3, 6, 10
1829    /// # for w in vec![1, 3, 6, 10] {
1830    /// #     assert_eq!(stream.next().await.unwrap(), w);
1831    /// # }
1832    /// # }));
1833    /// # }
1834    /// ```
1835    pub fn scan_async_blocking<A, U, I, F, Fut>(
1836        self,
1837        init: impl IntoQuotedMut<'a, I, L>,
1838        f: impl IntoQuotedMut<'a, F, L>,
1839    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1840    where
1841        O: IsOrdered,
1842        R: IsExactlyOnce,
1843        I: Fn() -> A + 'a,
1844        F: Fn(&mut A, T) -> Fut + 'a,
1845        Fut: Future<Output = Option<U>> + 'a,
1846    {
1847        let init = init.splice_fn0_ctx(&self.location).into();
1848        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1849
1850        Stream::new(
1851            self.location.clone(),
1852            HydroNode::ScanAsyncBlocking {
1853                init,
1854                acc: f,
1855                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1856                metadata: self.location.new_node_metadata(
1857                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1858                ),
1859            },
1860        )
1861    }
1862
1863    /// Iteratively processes the elements of the stream using a state machine that can yield
1864    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1865    /// syntax in Rust, without requiring special syntax.
1866    ///
1867    /// Like [`Stream::scan`], this function takes in an initializer that emits the initial
1868    /// state. The second argument defines the processing logic, taking in a mutable reference
1869    /// to the state and the value to be processed. It emits a [`Generate`] value, whose
1870    /// variants define what is emitted and whether further inputs should be processed.
1871    ///
1872    /// # Example
1873    /// ```rust
1874    /// # #[cfg(feature = "deploy")] {
1875    /// # use hydro_lang::prelude::*;
1876    /// # use futures::StreamExt;
1877    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1878    /// process.source_iter(q!(vec![1, 3, 100, 10])).generator(
1879    ///     q!(|| 0),
1880    ///     q!(|acc, x| {
1881    ///         *acc += x;
1882    ///         if *acc > 100 {
1883    ///             hydro_lang::live_collections::keyed_stream::Generate::Return("done!".to_owned())
1884    ///         } else if *acc % 2 == 0 {
1885    ///             hydro_lang::live_collections::keyed_stream::Generate::Yield("even".to_owned())
1886    ///         } else {
1887    ///             hydro_lang::live_collections::keyed_stream::Generate::Continue
1888    ///         }
1889    ///     }),
1890    /// )
1891    /// # }, |mut stream| async move {
1892    /// // Output: "even", "done!"
1893    /// # let mut results = Vec::new();
1894    /// # for _ in 0..2 {
1895    /// #     results.push(stream.next().await.unwrap());
1896    /// # }
1897    /// # results.sort();
1898    /// # assert_eq!(results, vec!["done!".to_owned(), "even".to_owned()]);
1899    /// # }));
1900    /// # }
1901    /// ```
1902    pub fn generator<A, U, I, F>(
1903        self,
1904        init: impl IntoQuotedMut<'a, I, L> + Copy,
1905        f: impl IntoQuotedMut<'a, F, L> + Copy,
1906    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1907    where
1908        O: IsOrdered,
1909        R: IsExactlyOnce,
1910        I: Fn() -> A + 'a,
1911        F: Fn(&mut A, T) -> Generate<U> + 'a,
1912    {
1913        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1914        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1915
1916        let this = self.make_totally_ordered().make_exactly_once();
1917
1918        // State is Option<Option<A>>:
1919        //   None = not yet initialized
1920        //   Some(Some(a)) = active with state a
1921        //   Some(None) = terminated
1922        let scan_init = q!(|| None)
1923            .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1924            .into();
1925        let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1926            if state.is_none() {
1927                *state = Some(Some(init()));
1928            }
1929            match state {
1930                Some(Some(state_value)) => match f(state_value, v) {
1931                    Generate::Yield(out) => Some(Some(out)),
1932                    Generate::Return(out) => {
1933                        *state = Some(None);
1934                        Some(Some(out))
1935                    }
1936                    // Unlike KeyedStream, we can terminate the scan directly on
1937                    // Break/Return because there is only one state (no other keys
1938                    // that still need processing).
1939                    Generate::Break => None,
1940                    Generate::Continue => Some(None),
1941                },
1942                // State is Some(None) after Return; terminate the scan.
1943                _ => None,
1944            }
1945        })
1946        .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1947        .into();
1948
1949        let scan_node = HydroNode::Scan {
1950            init: scan_init,
1951            acc: scan_f,
1952            input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1953            metadata: this.location.new_node_metadata(Stream::<
1954                Option<U>,
1955                L,
1956                B,
1957                TotalOrder,
1958                ExactlyOnce,
1959            >::collection_kind()),
1960        };
1961
1962        let flatten_f = q!(|d| d)
1963            .splice_fn1_ctx::<Option<U>, _>(&this.location)
1964            .into();
1965        let flatten_node = HydroNode::FlatMap {
1966            f: flatten_f,
1967            input: Box::new(scan_node),
1968            metadata: this
1969                .location
1970                .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1971        };
1972
1973        Stream::new(this.location.clone(), flatten_node)
1974    }
1975
1976    /// Given a time interval, returns a stream corresponding to samples taken from the
1977    /// stream roughly at that interval. The output will have elements in the same order
1978    /// as the input, but with arbitrary elements skipped between samples. There is also
1979    /// no guarantee on the exact timing of the samples.
1980    ///
1981    /// # Non-Determinism
1982    /// The output stream is non-deterministic in which elements are sampled, since this
1983    /// is controlled by a clock.
1984    pub fn sample_every(
1985        self,
1986        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1987        nondet: NonDet,
1988    ) -> Stream<T, L::DropConsistency, Unbounded, O, AtLeastOnce>
1989    where
1990        L: TopLevel<'a>,
1991    {
1992        let samples = self.location.source_interval(interval);
1993
1994        let tick = self.location.tick();
1995        self.batch(&tick, nondet)
1996            .filter_if(samples.batch(&tick, nondet).first().is_some())
1997            .all_ticks()
1998            .weaken_retries()
1999    }
2000
2001    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
2002    /// stream has not emitted a value since that duration.
2003    ///
2004    /// # Non-Determinism
2005    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
2006    /// samples take place, timeouts may be non-deterministically generated or missed,
2007    /// and the notification of the timeout may be delayed as well. There is also no
2008    /// guarantee on how long the [`Optional`] will have a value after the timeout is
2009    /// detected based on when the next sample is taken.
2010    pub fn timeout(
2011        self,
2012        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L::DropConsistency>> + Copy + 'a,
2013        nondet: NonDet,
2014    ) -> Optional<(), L::DropConsistency, Unbounded>
2015    where
2016        L: TopLevel<'a>,
2017    {
2018        let tick = self.location.tick();
2019
2020        let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
2021            q!(|| None),
2022            q!(
2023                |latest, _| {
2024                    *latest = Some(Instant::now());
2025                },
2026                commutative = manual_proof!(/** TODO */)
2027            ),
2028        );
2029
2030        latest_received
2031            .snapshot(&tick, nondet)
2032            .filter_map(q!(move |latest_received| {
2033                if let Some(latest_received) = latest_received {
2034                    if Instant::now().duration_since(latest_received) > duration {
2035                        Some(())
2036                    } else {
2037                        None
2038                    }
2039                } else {
2040                    Some(())
2041                }
2042            }))
2043            .latest()
2044    }
2045
2046    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
2047    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
2048    ///
2049    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2050    /// processed before an acknowledgement is emitted.
2051    pub fn atomic(self) -> Stream<T, Atomic<L>, B, O, R> {
2052        let id = self.location.flow_state().borrow_mut().next_clock_id();
2053        let out_location = Atomic {
2054            tick: Tick {
2055                id,
2056                l: self.location.clone(),
2057            },
2058        };
2059        Stream::new(
2060            out_location.clone(),
2061            HydroNode::BeginAtomic {
2062                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2063                metadata: out_location
2064                    .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
2065            },
2066        )
2067    }
2068
2069    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2070    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2071    /// the order of the input. The output stream will execute in the [`Tick`] that was
2072    /// used to create the atomic section.
2073    ///
2074    /// # Non-Determinism
2075    /// The batch boundaries are non-deterministic and may change across executions.
2076    pub fn batch<L2: Location<'a, DropConsistency = L::DropConsistency>>(
2077        self,
2078        tick: &Tick<L2>,
2079        _nondet: NonDet,
2080    ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
2081        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2082        Stream::new(
2083            tick.drop_consistency(),
2084            HydroNode::Batch {
2085                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2086                metadata: tick
2087                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2088            },
2089        )
2090    }
2091
2092    /// An operator which allows you to "name" a `HydroNode`.
2093    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
2094    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
2095        {
2096            let mut node = self.ir_node.borrow_mut();
2097            let metadata = node.metadata_mut();
2098            metadata.tag = Some(name.to_owned());
2099        }
2100        self
2101    }
2102
2103    /// Turns this [`Stream`] into a [`Optional`], under the invariant assumption that there is at
2104    /// most one element. If this invariant is broken, the program may exhibit undefined behavior,
2105    /// so uses must be carefully vetted.
2106    pub(crate) fn cast_at_most_one_element(self) -> Optional<T, L, B>
2107    where
2108        B: IsBounded,
2109    {
2110        Optional::new(
2111            self.location.clone(),
2112            HydroNode::Cast {
2113                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2114                metadata: self
2115                    .location
2116                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
2117            },
2118        )
2119    }
2120
2121    pub(crate) fn use_ordering_type<O2: Ordering>(self) -> Stream<T, L, B, O2, R> {
2122        if O::ORDERING_KIND == O2::ORDERING_KIND {
2123            Stream::new(
2124                self.location.clone(),
2125                self.ir_node.replace(HydroNode::Placeholder),
2126            )
2127        } else {
2128            panic!(
2129                "Runtime ordering {:?} did not match requested cast {:?}.",
2130                O::ORDERING_KIND,
2131                O2::ORDERING_KIND
2132            )
2133        }
2134    }
2135
2136    /// Explicitly "casts" the stream to a type with a different ordering
2137    /// guarantee. Useful in unsafe code where the ordering cannot be proven
2138    /// by the type-system.
2139    ///
2140    /// # Non-Determinism
2141    /// This function is used as an escape hatch, and any mistakes in the
2142    /// provided ordering guarantee will propagate into the guarantees
2143    /// for the rest of the program.
2144    pub fn assume_ordering<O2: Ordering>(
2145        self,
2146        _nondet: NonDet,
2147    ) -> Stream<T, L::DropConsistency, B, O2, R> {
2148        if O::ORDERING_KIND == O2::ORDERING_KIND {
2149            self.use_ordering_type().weaken_consistency()
2150        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2151            // We can always weaken the ordering guarantee
2152            let target_location = self.location().drop_consistency();
2153            Stream::new(
2154                target_location.clone(),
2155                HydroNode::Cast {
2156                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2157                    metadata: target_location
2158                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2159                },
2160            )
2161        } else {
2162            let target_location = self.location().drop_consistency();
2163            Stream::new(
2164                target_location.clone(),
2165                HydroNode::ObserveNonDet {
2166                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2167                    trusted: false,
2168                    metadata: target_location
2169                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2170                },
2171            )
2172        }
2173    }
2174
2175    // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
2176    // intermediate states will not be revealed
2177    fn assume_ordering_trusted_bounded<O2: Ordering>(
2178        self,
2179        nondet: NonDet,
2180    ) -> Stream<T, L, B, O2, R> {
2181        if B::BOUNDED {
2182            self.assume_ordering_trusted(nondet)
2183        } else {
2184            let self_location = self.location.clone();
2185            let inner: Stream<T, L::DropConsistency, B, O2, R> = self.assume_ordering(nondet);
2186            Stream::new(self_location, inner.ir_node.replace(HydroNode::Placeholder))
2187        }
2188    }
2189
2190    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
2191    // is not observable
2192    pub(crate) fn assume_ordering_trusted<O2: Ordering>(
2193        self,
2194        _nondet: NonDet,
2195    ) -> Stream<T, L, B, O2, R> {
2196        if O::ORDERING_KIND == O2::ORDERING_KIND {
2197            self.use_ordering_type()
2198        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2199            // We can always weaken the ordering guarantee
2200            Stream::new(
2201                self.location.clone(),
2202                HydroNode::Cast {
2203                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2204                    metadata: self
2205                        .location
2206                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2207                },
2208            )
2209        } else {
2210            Stream::new(
2211                self.location.clone(),
2212                HydroNode::ObserveNonDet {
2213                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2214                    trusted: true,
2215                    metadata: self
2216                        .location
2217                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2218                },
2219            )
2220        }
2221    }
2222
2223    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
2224    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
2225    /// which is always safe because that is the weakest possible guarantee.
2226    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
2227        self.weaken_ordering::<NoOrder>()
2228    }
2229
2230    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
2231    /// enforcing that `O2` is weaker than the input ordering guarantee.
2232    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
2233        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
2234        self.assume_ordering_trusted::<O2>(nondet)
2235    }
2236
2237    /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
2238    /// implies that `O == TotalOrder`.
2239    pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
2240    where
2241        O: IsOrdered,
2242    {
2243        self.assume_ordering_trusted(nondet!(/** no-op */))
2244    }
2245
2246    /// Explicitly "casts" the stream to a type with a different retries
2247    /// guarantee. Useful in unsafe code where the lack of retries cannot
2248    /// be proven by the type-system.
2249    ///
2250    /// # Non-Determinism
2251    /// This function is used as an escape hatch, and any mistakes in the
2252    /// provided retries guarantee will propagate into the guarantees
2253    /// for the rest of the program.
2254    pub fn assume_retries<R2: Retries>(
2255        self,
2256        _nondet: NonDet,
2257    ) -> Stream<T, L::DropConsistency, B, O, R2> {
2258        if R::RETRIES_KIND == R2::RETRIES_KIND {
2259            Stream::new(
2260                self.location.drop_consistency(),
2261                self.ir_node.replace(HydroNode::Placeholder),
2262            )
2263        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2264            // We can always weaken the retries guarantee
2265            let target_location = self.location.drop_consistency();
2266            Stream::new(
2267                target_location.clone(),
2268                HydroNode::Cast {
2269                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2270                    metadata: target_location
2271                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2272                },
2273            )
2274        } else {
2275            let target_location = self.location.drop_consistency();
2276            Stream::new(
2277                target_location.clone(),
2278                HydroNode::ObserveNonDet {
2279                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2280                    trusted: false,
2281                    metadata: target_location
2282                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2283                },
2284            )
2285        }
2286    }
2287
2288    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
2289    // is not observable
2290    fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
2291        if R::RETRIES_KIND == R2::RETRIES_KIND {
2292            Stream::new(
2293                self.location.clone(),
2294                self.ir_node.replace(HydroNode::Placeholder),
2295            )
2296        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2297            // We can always weaken the retries guarantee
2298            Stream::new(
2299                self.location.clone(),
2300                HydroNode::Cast {
2301                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2302                    metadata: self
2303                        .location
2304                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2305                },
2306            )
2307        } else {
2308            Stream::new(
2309                self.location.clone(),
2310                HydroNode::ObserveNonDet {
2311                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2312                    trusted: true,
2313                    metadata: self
2314                        .location
2315                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2316                },
2317            )
2318        }
2319    }
2320
2321    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
2322    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
2323    /// which is always safe because that is the weakest possible guarantee.
2324    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
2325        self.weaken_retries::<AtLeastOnce>()
2326    }
2327
2328    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
2329    /// enforcing that `R2` is weaker than the input retries guarantee.
2330    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
2331        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
2332        self.assume_retries_trusted::<R2>(nondet)
2333    }
2334
2335    /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
2336    /// implies that `R == ExactlyOnce`.
2337    pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
2338    where
2339        R: IsExactlyOnce,
2340    {
2341        self.assume_retries_trusted(nondet!(/** no-op */))
2342    }
2343
2344    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
2345    /// implies that `B == Bounded`.
2346    pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
2347    where
2348        B: IsBounded,
2349    {
2350        self.weaken_boundedness()
2351    }
2352
2353    /// Weakens the boundedness guarantee to an arbitrary boundedness `B2`, given that `B: IsBounded`,
2354    /// which implies that `B == Bounded`.
2355    pub fn weaken_boundedness<B2: Boundedness>(self) -> Stream<T, L, B2, O, R> {
2356        if B::BOUNDED == B2::BOUNDED {
2357            Stream::new(
2358                self.location.clone(),
2359                self.ir_node.replace(HydroNode::Placeholder),
2360            )
2361        } else {
2362            // We can always weaken the boundedness
2363            Stream::new(
2364                self.location.clone(),
2365                HydroNode::Cast {
2366                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2367                    metadata: self
2368                        .location
2369                        .new_node_metadata(Stream::<T, L, B2, O, R>::collection_kind()),
2370                },
2371            )
2372        }
2373    }
2374}
2375
2376impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
2377where
2378    L: Location<'a>,
2379{
2380    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
2381    ///
2382    /// # Example
2383    /// ```rust
2384    /// # #[cfg(feature = "deploy")] {
2385    /// # use hydro_lang::prelude::*;
2386    /// # use futures::StreamExt;
2387    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2388    /// process.source_iter(q!(&[1, 2, 3])).cloned()
2389    /// # }, |mut stream| async move {
2390    /// // 1, 2, 3
2391    /// # for w in vec![1, 2, 3] {
2392    /// #     assert_eq!(stream.next().await.unwrap(), w);
2393    /// # }
2394    /// # }));
2395    /// # }
2396    /// ```
2397    pub fn cloned(self) -> Stream<T, L, B, O, R>
2398    where
2399        T: Clone,
2400    {
2401        self.map(q!(|d| d.clone()))
2402    }
2403}
2404
2405impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2406where
2407    L: Location<'a>,
2408{
2409    /// Computes the number of elements in the stream as a [`Singleton`].
2410    ///
2411    /// # Example
2412    /// ```rust
2413    /// # #[cfg(feature = "deploy")] {
2414    /// # use hydro_lang::prelude::*;
2415    /// # use futures::StreamExt;
2416    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2417    /// let tick = process.tick();
2418    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2419    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2420    /// batch.count().all_ticks()
2421    /// # }, |mut stream| async move {
2422    /// // 4
2423    /// # assert_eq!(stream.next().await.unwrap(), 4);
2424    /// # }));
2425    /// # }
2426    /// ```
2427    pub fn count(self) -> Singleton<usize, L, B::StreamToMonotone> {
2428        self.assume_ordering_trusted::<TotalOrder>(nondet!(
2429            /// Order does not affect eventual count, and also does not affect intermediate states.
2430        ))
2431        .fold(
2432            q!(|| 0usize),
2433            q!(
2434                |count, _| *count += 1,
2435                monotone = manual_proof!(/** += 1 is monotone */)
2436            ),
2437        )
2438    }
2439}
2440
2441impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2442    /// Produces a new stream that merges the elements of the two input streams.
2443    /// The result has [`NoOrder`] because the order of merging is not guaranteed.
2444    ///
2445    /// Currently, both input streams must be [`Unbounded`]. When the streams are
2446    /// [`Bounded`], you can use [`Stream::chain`] instead.
2447    ///
2448    /// # Example
2449    /// ```rust
2450    /// # #[cfg(feature = "deploy")] {
2451    /// # use hydro_lang::prelude::*;
2452    /// # use futures::StreamExt;
2453    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2454    /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
2455    /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
2456    /// numbers.clone().map(q!(|x| x + 1)).merge_unordered(numbers)
2457    /// # }, |mut stream| async move {
2458    /// // 2, 3, 4, 5, and 1, 2, 3, 4 merged in unknown order
2459    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2460    /// #     assert_eq!(stream.next().await.unwrap(), w);
2461    /// # }
2462    /// # }));
2463    /// # }
2464    /// ```
2465    pub fn merge_unordered<O2: Ordering, R2: Retries>(
2466        self,
2467        other: Stream<T, L, Unbounded, O2, R2>,
2468    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2469    where
2470        R: MinRetries<R2>,
2471    {
2472        Stream::new(
2473            self.location.clone(),
2474            HydroNode::Chain {
2475                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2476                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2477                metadata: self.location.new_node_metadata(Stream::<
2478                    T,
2479                    L,
2480                    Unbounded,
2481                    NoOrder,
2482                    <R as MinRetries<R2>>::Min,
2483                >::collection_kind()),
2484            },
2485        )
2486    }
2487
2488    /// Deprecated: use [`Stream::merge_unordered`] instead.
2489    #[deprecated(note = "use `merge_unordered` instead")]
2490    pub fn interleave<O2: Ordering, R2: Retries>(
2491        self,
2492        other: Stream<T, L, Unbounded, O2, R2>,
2493    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2494    where
2495        R: MinRetries<R2>,
2496    {
2497        self.merge_unordered(other)
2498    }
2499}
2500
2501impl<'a, T, L: Location<'a>, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R> {
2502    /// Produces a new stream that combines the elements of the two input streams,
2503    /// preserving the relative order of elements within each input.
2504    ///
2505    /// # Non-Determinism
2506    /// The order in which elements *across* the two streams will be interleaved is
2507    /// non-deterministic, so the order of elements will vary across runs. If the output
2508    /// order is irrelevant, use [`Stream::merge_unordered`] instead, which is deterministic
2509    /// but emits an unordered stream. For deterministic first-then-second ordering on
2510    /// bounded streams, use [`Stream::chain`].
2511    ///
2512    /// # Example
2513    /// ```rust
2514    /// # #[cfg(feature = "deploy")] {
2515    /// # use hydro_lang::prelude::*;
2516    /// # use futures::StreamExt;
2517    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2518    /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
2519    /// # process.source_iter(q!(vec![1, 3])).into();
2520    /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
2521    /// # }, |mut stream| async move {
2522    /// // 1, 3 and 2, 4 in some order, preserving the original local order
2523    /// # for w in vec![1, 3, 2, 4] {
2524    /// #     assert_eq!(stream.next().await.unwrap(), w);
2525    /// # }
2526    /// # }));
2527    /// # }
2528    /// ```
2529    pub fn merge_ordered<R2: Retries>(
2530        self,
2531        other: Stream<T, L, B, TotalOrder, R2>,
2532        _nondet: NonDet,
2533    ) -> Stream<T, L::DropConsistency, B, TotalOrder, <R as MinRetries<R2>>::Min>
2534    where
2535        R: MinRetries<R2>,
2536    {
2537        let target_location = self.location().drop_consistency();
2538        Stream::new(
2539            target_location.clone(),
2540            HydroNode::MergeOrdered {
2541                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2542                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2543                metadata: target_location.new_node_metadata(Stream::<
2544                    T,
2545                    L::DropConsistency,
2546                    B,
2547                    TotalOrder,
2548                    <R as MinRetries<R2>>::Min,
2549                >::collection_kind()),
2550            },
2551        )
2552    }
2553}
2554
2555impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2556where
2557    L: Location<'a>,
2558{
2559    /// Produces a new stream that emits the input elements in sorted order.
2560    ///
2561    /// The input stream can have any ordering guarantee, but the output stream
2562    /// will have a [`TotalOrder`] guarantee. This operator will block until all
2563    /// elements in the input stream are available, so it requires the input stream
2564    /// to be [`Bounded`].
2565    ///
2566    /// # Example
2567    /// ```rust
2568    /// # #[cfg(feature = "deploy")] {
2569    /// # use hydro_lang::prelude::*;
2570    /// # use futures::StreamExt;
2571    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2572    /// let tick = process.tick();
2573    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
2574    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2575    /// batch.sort().all_ticks()
2576    /// # }, |mut stream| async move {
2577    /// // 1, 2, 3, 4
2578    /// # for w in (1..5) {
2579    /// #     assert_eq!(stream.next().await.unwrap(), w);
2580    /// # }
2581    /// # }));
2582    /// # }
2583    /// ```
2584    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2585    where
2586        B: IsBounded,
2587        T: Ord,
2588    {
2589        let this = self.make_bounded();
2590        Stream::new(
2591            this.location.clone(),
2592            HydroNode::Sort {
2593                input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2594                metadata: this
2595                    .location
2596                    .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2597            },
2598        )
2599    }
2600
2601    /// Produces a new stream that first emits the elements of the `self` stream,
2602    /// and then emits the elements of the `other` stream. The output stream has
2603    /// a [`TotalOrder`] guarantee if and only if both input streams have a
2604    /// [`TotalOrder`] guarantee.
2605    ///
2606    /// Currently, both input streams must be [`Bounded`]. This operator will block
2607    /// on the first stream until all its elements are available. In a future version,
2608    /// we will relax the requirement on the `other` stream.
2609    ///
2610    /// # Example
2611    /// ```rust
2612    /// # #[cfg(feature = "deploy")] {
2613    /// # use hydro_lang::prelude::*;
2614    /// # use futures::StreamExt;
2615    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2616    /// let tick = process.tick();
2617    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2618    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2619    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2620    /// # }, |mut stream| async move {
2621    /// // 2, 3, 4, 5, 1, 2, 3, 4
2622    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2623    /// #     assert_eq!(stream.next().await.unwrap(), w);
2624    /// # }
2625    /// # }));
2626    /// # }
2627    /// ```
2628    pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2629        self,
2630        other: Stream<T, L, B2, O2, R2>,
2631    ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2632    where
2633        B: IsBounded,
2634        O: MinOrder<O2>,
2635        R: MinRetries<R2>,
2636    {
2637        check_matching_location(&self.location, &other.location);
2638
2639        Stream::new(
2640            self.location.clone(),
2641            HydroNode::Chain {
2642                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2643                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2644                metadata: self.location.new_node_metadata(Stream::<
2645                    T,
2646                    L,
2647                    B2,
2648                    <O as MinOrder<O2>>::Min,
2649                    <R as MinRetries<R2>>::Min,
2650                >::collection_kind()),
2651            },
2652        )
2653    }
2654
2655    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
2656    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
2657    /// because this is compiled into a nested loop.
2658    #[expect(
2659        clippy::type_complexity,
2660        reason = "MinRetries projection in return type"
2661    )]
2662    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>, R2: Retries>(
2663        self,
2664        other: Stream<T2, L, Bounded, O2, R2>,
2665    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, <R as MinRetries<R2>>::Min>
2666    where
2667        B: IsBounded,
2668        T: Clone,
2669        T2: Clone,
2670        R: MinRetries<R2>,
2671    {
2672        let this = self.make_bounded();
2673        check_matching_location(&this.location, &other.location);
2674
2675        Stream::new(
2676            this.location.clone(),
2677            HydroNode::CrossProduct {
2678                left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2679                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2680                metadata: this.location.new_node_metadata(Stream::<
2681                    (T, T2),
2682                    L,
2683                    Bounded,
2684                    <O2 as MinOrder<O>>::Min,
2685                    <R as MinRetries<R2>>::Min,
2686                >::collection_kind()),
2687            },
2688        )
2689    }
2690
2691    /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2692    /// `self` used as the values for *each* key.
2693    ///
2694    /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2695    /// values. For example, it can be used to send the same set of elements to several cluster
2696    /// members, if the membership information is available as a [`KeyedSingleton`].
2697    ///
2698    /// # Example
2699    /// ```rust
2700    /// # #[cfg(feature = "deploy")] {
2701    /// # use hydro_lang::prelude::*;
2702    /// # use futures::StreamExt;
2703    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2704    /// # let tick = process.tick();
2705    /// let keyed_singleton = // { 1: (), 2: () }
2706    /// # process
2707    /// #     .source_iter(q!(vec![(1, ()), (2, ())]))
2708    /// #     .into_keyed()
2709    /// #     .batch(&tick, nondet!(/** test */))
2710    /// #     .first();
2711    /// let stream = // [ "a", "b" ]
2712    /// # process
2713    /// #     .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2714    /// #     .batch(&tick, nondet!(/** test */));
2715    /// stream.repeat_with_keys(keyed_singleton)
2716    /// # .entries().all_ticks()
2717    /// # }, |mut stream| async move {
2718    /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2719    /// # let mut results = Vec::new();
2720    /// # for _ in 0..4 {
2721    /// #     results.push(stream.next().await.unwrap());
2722    /// # }
2723    /// # results.sort();
2724    /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2725    /// # }));
2726    /// # }
2727    /// ```
2728    pub fn repeat_with_keys<K, V2>(
2729        self,
2730        keys: KeyedSingleton<K, V2, L, Bounded>,
2731    ) -> KeyedStream<K, T, L, Bounded, O, R>
2732    where
2733        B: IsBounded,
2734        K: Clone,
2735        T: Clone,
2736    {
2737        keys.keys()
2738            .assume_ordering_trusted::<TotalOrder>(
2739                nondet!(/** keyed stream does not depend on ordering of keys */),
2740            )
2741            .cross_product_nested_loop(self.make_bounded())
2742            .into_keyed()
2743    }
2744
2745    /// Consumes a stream of `Future<T>`, resolving each future while blocking subgraph
2746    /// execution until all results are available. The output order is based on when futures
2747    /// complete, and may be different than the input order.
2748    ///
2749    /// Unlike [`Stream::resolve_futures`], which allows the subgraph to continue executing
2750    /// while futures are pending, this variant blocks until the futures resolve.
2751    ///
2752    /// # Example
2753    /// ```rust
2754    /// # #[cfg(feature = "deploy")] {
2755    /// # use std::collections::HashSet;
2756    /// # use futures::StreamExt;
2757    /// # use hydro_lang::prelude::*;
2758    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2759    /// process
2760    ///     .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2761    ///     .map(q!(|x| async move {
2762    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2763    ///         x
2764    ///     }))
2765    ///     .resolve_futures_blocking()
2766    /// #   },
2767    /// #   |mut stream| async move {
2768    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2769    /// #       let mut output = HashSet::new();
2770    /// #       for _ in 1..10 {
2771    /// #           output.insert(stream.next().await.unwrap());
2772    /// #       }
2773    /// #       assert_eq!(
2774    /// #           output,
2775    /// #           HashSet::<i32>::from_iter(1..10)
2776    /// #       );
2777    /// #   },
2778    /// # ));
2779    /// # }
2780    /// ```
2781    pub fn resolve_futures_blocking(self) -> Stream<T::Output, L, B, NoOrder, R>
2782    where
2783        T: Future,
2784    {
2785        Stream::new(
2786            self.location.clone(),
2787            HydroNode::ResolveFuturesBlocking {
2788                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2789                metadata: self
2790                    .location
2791                    .new_node_metadata(Stream::<T::Output, L, B, NoOrder, R>::collection_kind()),
2792            },
2793        )
2794    }
2795
2796    /// Returns a [`Singleton`] containing `true` if the stream has no elements, or `false` otherwise.
2797    ///
2798    /// # Example
2799    /// ```rust
2800    /// # #[cfg(feature = "deploy")] {
2801    /// # use hydro_lang::prelude::*;
2802    /// # use futures::StreamExt;
2803    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2804    /// let tick = process.tick();
2805    /// let empty: Stream<i32, _, Bounded> = process
2806    ///   .source_iter(q!(Vec::<i32>::new()))
2807    ///   .batch(&tick, nondet!(/** test */));
2808    /// empty.is_empty().all_ticks()
2809    /// # }, |mut stream| async move {
2810    /// // true
2811    /// # assert_eq!(stream.next().await.unwrap(), true);
2812    /// # }));
2813    /// # }
2814    /// ```
2815    #[expect(clippy::wrong_self_convention, reason = "stream function naming")]
2816    pub fn is_empty(self) -> Singleton<bool, L, Bounded>
2817    where
2818        B: IsBounded,
2819    {
2820        self.make_bounded()
2821            .assume_ordering_trusted::<TotalOrder>(
2822                nondet!(/** is_empty intermediates unaffected by order */),
2823            )
2824            .first()
2825            .is_none()
2826    }
2827}
2828
2829impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2830where
2831    L: Location<'a>,
2832{
2833    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2834    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2835    /// by equi-joining the two streams on the key attribute `K`.
2836    ///
2837    /// When the right-hand side is [`Bounded`], the join accumulates the right side first
2838    /// and streams the left side through, preserving the left side's ordering. When both
2839    /// sides are [`Unbounded`], a symmetric hash join is used and ordering is [`NoOrder`].
2840    ///
2841    /// # Example
2842    /// ```rust
2843    /// # #[cfg(feature = "deploy")] {
2844    /// # use hydro_lang::prelude::*;
2845    /// # use std::collections::HashSet;
2846    /// # use futures::StreamExt;
2847    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2848    /// let tick = process.tick();
2849    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2850    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2851    /// stream1.join(stream2)
2852    /// # }, |mut stream| async move {
2853    /// // (1, ('a', 'x')), (2, ('b', 'y'))
2854    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2855    /// # stream.map(|i| assert!(expected.contains(&i)));
2856    /// # }));
2857    /// # }
2858    pub fn join<V2, B2: Boundedness, O2: Ordering, R2: Retries>(
2859        self,
2860        n: Stream<(K, V2), L, B2, O2, R2>,
2861    ) -> Stream<(K, (V1, V2)), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
2862    where
2863        K: Eq + Hash + Clone,
2864        R: MinRetries<R2>,
2865        V1: Clone,
2866        V2: Clone,
2867    {
2868        check_matching_location(&self.location, &n.location);
2869
2870        let ir_node = if B2::BOUNDED {
2871            HydroNode::JoinHalf {
2872                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2873                right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2874                metadata: self.location.new_node_metadata(Stream::<
2875                    (K, (V1, V2)),
2876                    L,
2877                    B,
2878                    B2::PreserveOrderIfBounded<O>,
2879                    <R as MinRetries<R2>>::Min,
2880                >::collection_kind()),
2881            }
2882        } else {
2883            HydroNode::Join {
2884                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2885                right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2886                metadata: self.location.new_node_metadata(Stream::<
2887                    (K, (V1, V2)),
2888                    L,
2889                    B,
2890                    B2::PreserveOrderIfBounded<O>,
2891                    <R as MinRetries<R2>>::Min,
2892                >::collection_kind()),
2893            }
2894        };
2895
2896        Stream::new(self.location.clone(), ir_node)
2897    }
2898
2899    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2900    /// computes the anti-join of the items in the input -- i.e. returns
2901    /// unique items in the first input that do not have a matching key
2902    /// in the second input.
2903    ///
2904    /// # Example
2905    /// ```rust
2906    /// # #[cfg(feature = "deploy")] {
2907    /// # use hydro_lang::prelude::*;
2908    /// # use futures::StreamExt;
2909    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2910    /// let tick = process.tick();
2911    /// let stream = process
2912    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2913    ///   .batch(&tick, nondet!(/** test */));
2914    /// let batch = process
2915    ///   .source_iter(q!(vec![1, 2]))
2916    ///   .batch(&tick, nondet!(/** test */));
2917    /// stream.anti_join(batch).all_ticks()
2918    /// # }, |mut stream| async move {
2919    /// # for w in vec![(3, 'c'), (4, 'd')] {
2920    /// #     assert_eq!(stream.next().await.unwrap(), w);
2921    /// # }
2922    /// # }));
2923    /// # }
2924    pub fn anti_join<O2: Ordering, R2: Retries>(
2925        self,
2926        n: Stream<K, L, Bounded, O2, R2>,
2927    ) -> Stream<(K, V1), L, B, O, R>
2928    where
2929        K: Eq + Hash,
2930    {
2931        check_matching_location(&self.location, &n.location);
2932
2933        Stream::new(
2934            self.location.clone(),
2935            HydroNode::AntiJoin {
2936                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2937                neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2938                metadata: self
2939                    .location
2940                    .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2941            },
2942        )
2943    }
2944}
2945
2946impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2947    Stream<(K, V), L, B, O, R>
2948{
2949    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2950    /// is used as the key and the second element is added to the entries associated with that key.
2951    ///
2952    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2953    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2954    /// performing grouped aggregations, but also for more precise ordering guarantees such as
2955    /// total ordering _within_ each group but no ordering _across_ groups.
2956    ///
2957    /// # Example
2958    /// ```rust
2959    /// # #[cfg(feature = "deploy")] {
2960    /// # use hydro_lang::prelude::*;
2961    /// # use futures::StreamExt;
2962    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2963    /// process
2964    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2965    ///     .into_keyed()
2966    /// #   .entries()
2967    /// # }, |mut stream| async move {
2968    /// // { 1: [2, 3], 2: [4] }
2969    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2970    /// #     assert_eq!(stream.next().await.unwrap(), w);
2971    /// # }
2972    /// # }));
2973    /// # }
2974    /// ```
2975    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2976        KeyedStream::new(
2977            self.location.clone(),
2978            HydroNode::Cast {
2979                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2980                metadata: self
2981                    .location
2982                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2983            },
2984        )
2985    }
2986}
2987
2988impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2989where
2990    K: Eq + Hash,
2991    L: Location<'a>,
2992{
2993    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2994    /// # Example
2995    /// ```rust
2996    /// # #[cfg(feature = "deploy")] {
2997    /// # use hydro_lang::prelude::*;
2998    /// # use futures::StreamExt;
2999    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3000    /// let tick = process.tick();
3001    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
3002    /// let batch = numbers.batch(&tick, nondet!(/** test */));
3003    /// batch.keys().all_ticks()
3004    /// # }, |mut stream| async move {
3005    /// // 1, 2
3006    /// # assert_eq!(stream.next().await.unwrap(), 1);
3007    /// # assert_eq!(stream.next().await.unwrap(), 2);
3008    /// # }));
3009    /// # }
3010    /// ```
3011    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
3012        self.into_keyed()
3013            .fold(
3014                q!(|| ()),
3015                q!(
3016                    |_, _| {},
3017                    commutative = manual_proof!(/** values are ignored */),
3018                    idempotent = manual_proof!(/** values are ignored */)
3019                ),
3020            )
3021            .keys()
3022    }
3023}
3024
3025impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
3026where
3027    L: Location<'a>,
3028{
3029    /// Returns a stream corresponding to the latest batch of elements being atomically
3030    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
3031    /// the order of the input.
3032    ///
3033    /// # Non-Determinism
3034    /// The batch boundaries are non-deterministic and may change across executions.
3035    pub fn batch_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
3036        self,
3037        tick: &Tick<L2>,
3038        _nondet: NonDet,
3039    ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
3040        Stream::new(
3041            tick.drop_consistency(),
3042            HydroNode::Batch {
3043                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3044                metadata: tick
3045                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3046            },
3047        )
3048    }
3049
3050    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
3051    /// See [`Stream::atomic`] for more details.
3052    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
3053        Stream::new(
3054            self.location.tick.l.clone(),
3055            HydroNode::EndAtomic {
3056                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3057                metadata: self
3058                    .location
3059                    .tick
3060                    .l
3061                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
3062            },
3063        )
3064    }
3065}
3066
3067impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
3068where
3069    L: TopLevel<'a>,
3070    F: Future<Output = T>,
3071{
3072    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
3073    /// Future outputs are produced as available, regardless of input arrival order.
3074    ///
3075    /// # Example
3076    /// ```rust
3077    /// # #[cfg(feature = "deploy")] {
3078    /// # use std::collections::HashSet;
3079    /// # use futures::StreamExt;
3080    /// # use hydro_lang::prelude::*;
3081    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3082    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
3083    ///     .map(q!(|x| async move {
3084    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
3085    ///         x
3086    ///     }))
3087    ///     .resolve_futures()
3088    /// #   },
3089    /// #   |mut stream| async move {
3090    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
3091    /// #       let mut output = HashSet::new();
3092    /// #       for _ in 1..10 {
3093    /// #           output.insert(stream.next().await.unwrap());
3094    /// #       }
3095    /// #       assert_eq!(
3096    /// #           output,
3097    /// #           HashSet::<i32>::from_iter(1..10)
3098    /// #       );
3099    /// #   },
3100    /// # ));
3101    /// # }
3102    pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
3103        Stream::new(
3104            self.location.clone(),
3105            HydroNode::ResolveFutures {
3106                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3107                metadata: self
3108                    .location
3109                    .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
3110            },
3111        )
3112    }
3113
3114    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
3115    /// Future outputs are produced in the same order as the input stream.
3116    ///
3117    /// # Example
3118    /// ```rust
3119    /// # #[cfg(feature = "deploy")] {
3120    /// # use std::collections::HashSet;
3121    /// # use futures::StreamExt;
3122    /// # use hydro_lang::prelude::*;
3123    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3124    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
3125    ///     .map(q!(|x| async move {
3126    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
3127    ///         x
3128    ///     }))
3129    ///     .resolve_futures_ordered()
3130    /// #   },
3131    /// #   |mut stream| async move {
3132    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
3133    /// #       let mut output = Vec::new();
3134    /// #       for _ in 1..10 {
3135    /// #           output.push(stream.next().await.unwrap());
3136    /// #       }
3137    /// #       assert_eq!(
3138    /// #           output,
3139    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
3140    /// #       );
3141    /// #   },
3142    /// # ));
3143    /// # }
3144    pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
3145        Stream::new(
3146            self.location.clone(),
3147            HydroNode::ResolveFuturesOrdered {
3148                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3149                metadata: self
3150                    .location
3151                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3152            },
3153        )
3154    }
3155}
3156
3157impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
3158where
3159    L: Location<'a>,
3160{
3161    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
3162    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
3163    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
3164        Stream::new(
3165            self.location.outer().clone(),
3166            HydroNode::YieldConcat {
3167                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3168                metadata: self
3169                    .location
3170                    .outer()
3171                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3172            },
3173        )
3174    }
3175
3176    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
3177    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
3178    ///
3179    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
3180    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
3181    /// stream's [`Tick`] context.
3182    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
3183        let out_location = Atomic {
3184            tick: self.location.clone(),
3185        };
3186
3187        Stream::new(
3188            out_location.clone(),
3189            HydroNode::YieldConcat {
3190                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3191                metadata: out_location
3192                    .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
3193            },
3194        )
3195    }
3196
3197    /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
3198    /// such as `fold` retrain their memory across ticks rather than resetting across batches of
3199    /// input.
3200    ///
3201    /// This API is particularly useful for stateful computation on batches of data, such as
3202    /// maintaining an accumulated state that is up to date with the current batch.
3203    ///
3204    /// # Example
3205    /// ```rust
3206    /// # #[cfg(feature = "deploy")] {
3207    /// # use hydro_lang::prelude::*;
3208    /// # use futures::StreamExt;
3209    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3210    /// let tick = process.tick();
3211    /// # // ticks are lazy by default, forces the second tick to run
3212    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3213    /// # let batch_first_tick = process
3214    /// #   .source_iter(q!(vec![1, 2, 3, 4]))
3215    /// #  .batch(&tick, nondet!(/** test */));
3216    /// # let batch_second_tick = process
3217    /// #   .source_iter(q!(vec![5, 6, 7]))
3218    /// #   .batch(&tick, nondet!(/** test */))
3219    /// #   .defer_tick(); // appears on the second tick
3220    /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
3221    /// # batch_first_tick.chain(batch_second_tick).all_ticks();
3222    ///
3223    /// input.batch(&tick, nondet!(/** test */))
3224    ///     .across_ticks(|s| s.count()).all_ticks()
3225    /// # }, |mut stream| async move {
3226    /// // [4, 7]
3227    /// assert_eq!(stream.next().await.unwrap(), 4);
3228    /// assert_eq!(stream.next().await.unwrap(), 7);
3229    /// # }));
3230    /// # }
3231    /// ```
3232    pub fn across_ticks<Out: BatchAtomic<'a>>(
3233        self,
3234        thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
3235    ) -> Out::Batched {
3236        thunk(self.all_ticks_atomic()).batched_atomic()
3237    }
3238
3239    /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
3240    /// always has the elements of `self` at tick `T - 1`.
3241    ///
3242    /// At tick `0`, the output stream is empty, since there is no previous tick.
3243    ///
3244    /// This operator enables stateful iterative processing with ticks, by sending data from one
3245    /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
3246    ///
3247    /// # Example
3248    /// ```rust
3249    /// # #[cfg(feature = "deploy")] {
3250    /// # use hydro_lang::prelude::*;
3251    /// # use futures::StreamExt;
3252    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3253    /// let tick = process.tick();
3254    /// // ticks are lazy by default, forces the second tick to run
3255    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3256    ///
3257    /// let batch_first_tick = process
3258    ///   .source_iter(q!(vec![1, 2, 3, 4]))
3259    ///   .batch(&tick, nondet!(/** test */));
3260    /// let batch_second_tick = process
3261    ///   .source_iter(q!(vec![0, 3, 4, 5, 6]))
3262    ///   .batch(&tick, nondet!(/** test */))
3263    ///   .defer_tick(); // appears on the second tick
3264    /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
3265    ///
3266    /// changes_across_ticks.clone().filter_not_in(
3267    ///     changes_across_ticks.defer_tick() // the elements from the previous tick
3268    /// ).all_ticks()
3269    /// # }, |mut stream| async move {
3270    /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
3271    /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
3272    /// #     assert_eq!(stream.next().await.unwrap(), w);
3273    /// # }
3274    /// # }));
3275    /// # }
3276    /// ```
3277    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
3278        Stream::new(
3279            self.location.clone(),
3280            HydroNode::DeferTick {
3281                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3282                metadata: self
3283                    .location
3284                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3285            },
3286        )
3287    }
3288}
3289
3290#[cfg(test)]
3291mod tests {
3292    #[cfg(feature = "deploy")]
3293    use futures::{SinkExt, StreamExt};
3294    #[cfg(feature = "deploy")]
3295    use hydro_deploy::Deployment;
3296    #[cfg(feature = "deploy")]
3297    use serde::{Deserialize, Serialize};
3298    #[cfg(any(feature = "deploy", feature = "sim"))]
3299    use stageleft::q;
3300
3301    #[cfg(any(feature = "deploy", feature = "sim"))]
3302    use crate::compile::builder::FlowBuilder;
3303    #[cfg(feature = "deploy")]
3304    use crate::live_collections::sliced::sliced;
3305    #[cfg(feature = "deploy")]
3306    use crate::live_collections::stream::ExactlyOnce;
3307    #[cfg(feature = "sim")]
3308    use crate::live_collections::stream::NoOrder;
3309    #[cfg(any(feature = "deploy", feature = "sim"))]
3310    use crate::live_collections::stream::TotalOrder;
3311    #[cfg(any(feature = "deploy", feature = "sim"))]
3312    use crate::location::Location;
3313    #[cfg(feature = "sim")]
3314    use crate::networking::TCP;
3315    #[cfg(any(feature = "deploy", feature = "sim"))]
3316    use crate::nondet::nondet;
3317
3318    mod backtrace_chained_ops;
3319
3320    #[cfg(feature = "deploy")]
3321    struct P1 {}
3322    #[cfg(feature = "deploy")]
3323    struct P2 {}
3324
3325    #[cfg(feature = "deploy")]
3326    #[derive(Serialize, Deserialize, Debug)]
3327    struct SendOverNetwork {
3328        n: u32,
3329    }
3330
3331    #[cfg(feature = "deploy")]
3332    #[tokio::test]
3333    async fn first_ten_distributed() {
3334        use crate::networking::TCP;
3335
3336        let mut deployment = Deployment::new();
3337
3338        let mut flow = FlowBuilder::new();
3339        let first_node = flow.process::<P1>();
3340        let second_node = flow.process::<P2>();
3341        let external = flow.external::<P2>();
3342
3343        let numbers = first_node.source_iter(q!(0..10));
3344        let out_port = numbers
3345            .map(q!(|n| SendOverNetwork { n }))
3346            .send(&second_node, TCP.fail_stop().bincode())
3347            .send_bincode_external(&external);
3348
3349        let nodes = flow
3350            .with_process(&first_node, deployment.Localhost())
3351            .with_process(&second_node, deployment.Localhost())
3352            .with_external(&external, deployment.Localhost())
3353            .deploy(&mut deployment);
3354
3355        deployment.deploy().await.unwrap();
3356
3357        let mut external_out = nodes.connect(out_port).await;
3358
3359        deployment.start().await.unwrap();
3360
3361        for i in 0..10 {
3362            assert_eq!(external_out.next().await.unwrap().n, i);
3363        }
3364    }
3365
3366    #[cfg(feature = "deploy")]
3367    #[tokio::test]
3368    async fn first_cardinality() {
3369        let mut deployment = Deployment::new();
3370
3371        let mut flow = FlowBuilder::new();
3372        let node = flow.process::<()>();
3373        let external = flow.external::<()>();
3374
3375        let node_tick = node.tick();
3376        let count = node_tick
3377            .singleton(q!([1, 2, 3]))
3378            .into_stream()
3379            .flatten_ordered()
3380            .first()
3381            .into_stream()
3382            .count()
3383            .all_ticks()
3384            .send_bincode_external(&external);
3385
3386        let nodes = flow
3387            .with_process(&node, deployment.Localhost())
3388            .with_external(&external, deployment.Localhost())
3389            .deploy(&mut deployment);
3390
3391        deployment.deploy().await.unwrap();
3392
3393        let mut external_out = nodes.connect(count).await;
3394
3395        deployment.start().await.unwrap();
3396
3397        assert_eq!(external_out.next().await.unwrap(), 1);
3398    }
3399
3400    #[cfg(feature = "deploy")]
3401    #[tokio::test]
3402    async fn unbounded_reduce_remembers_state() {
3403        let mut deployment = Deployment::new();
3404
3405        let mut flow = FlowBuilder::new();
3406        let node = flow.process::<()>();
3407        let external = flow.external::<()>();
3408
3409        let (input_port, input) = node.source_external_bincode(&external);
3410        let out = input
3411            .reduce(q!(|acc, v| *acc += v))
3412            .sample_eager(nondet!(/** test */))
3413            .send_bincode_external(&external);
3414
3415        let nodes = flow
3416            .with_process(&node, deployment.Localhost())
3417            .with_external(&external, deployment.Localhost())
3418            .deploy(&mut deployment);
3419
3420        deployment.deploy().await.unwrap();
3421
3422        let mut external_in = nodes.connect(input_port).await;
3423        let mut external_out = nodes.connect(out).await;
3424
3425        deployment.start().await.unwrap();
3426
3427        external_in.send(1).await.unwrap();
3428        assert_eq!(external_out.next().await.unwrap(), 1);
3429
3430        external_in.send(2).await.unwrap();
3431        assert_eq!(external_out.next().await.unwrap(), 3);
3432    }
3433
3434    #[cfg(feature = "deploy")]
3435    #[tokio::test]
3436    async fn top_level_bounded_cross_singleton() {
3437        let mut deployment = Deployment::new();
3438
3439        let mut flow = FlowBuilder::new();
3440        let node = flow.process::<()>();
3441        let external = flow.external::<()>();
3442
3443        let (input_port, input) =
3444            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3445
3446        let out = input
3447            .cross_singleton(
3448                node.source_iter(q!(vec![1, 2, 3]))
3449                    .fold(q!(|| 0), q!(|acc, v| *acc += v)),
3450            )
3451            .send_bincode_external(&external);
3452
3453        let nodes = flow
3454            .with_process(&node, deployment.Localhost())
3455            .with_external(&external, deployment.Localhost())
3456            .deploy(&mut deployment);
3457
3458        deployment.deploy().await.unwrap();
3459
3460        let mut external_in = nodes.connect(input_port).await;
3461        let mut external_out = nodes.connect(out).await;
3462
3463        deployment.start().await.unwrap();
3464
3465        external_in.send(1).await.unwrap();
3466        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3467
3468        external_in.send(2).await.unwrap();
3469        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3470    }
3471
3472    #[cfg(feature = "deploy")]
3473    #[tokio::test]
3474    async fn top_level_bounded_reduce_cardinality() {
3475        let mut deployment = Deployment::new();
3476
3477        let mut flow = FlowBuilder::new();
3478        let node = flow.process::<()>();
3479        let external = flow.external::<()>();
3480
3481        let (input_port, input) =
3482            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3483
3484        let out = sliced! {
3485            let input = use(input, nondet!(/** test */));
3486            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
3487            input.cross_singleton(v.into_stream().count())
3488        }
3489        .send_bincode_external(&external);
3490
3491        let nodes = flow
3492            .with_process(&node, deployment.Localhost())
3493            .with_external(&external, deployment.Localhost())
3494            .deploy(&mut deployment);
3495
3496        deployment.deploy().await.unwrap();
3497
3498        let mut external_in = nodes.connect(input_port).await;
3499        let mut external_out = nodes.connect(out).await;
3500
3501        deployment.start().await.unwrap();
3502
3503        external_in.send(1).await.unwrap();
3504        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3505
3506        external_in.send(2).await.unwrap();
3507        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3508    }
3509
3510    #[cfg(feature = "deploy")]
3511    #[tokio::test]
3512    async fn top_level_bounded_into_singleton_cardinality() {
3513        let mut deployment = Deployment::new();
3514
3515        let mut flow = FlowBuilder::new();
3516        let node = flow.process::<()>();
3517        let external = flow.external::<()>();
3518
3519        let (input_port, input) =
3520            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3521
3522        let out = sliced! {
3523            let input = use(input, nondet!(/** test */));
3524            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
3525            input.cross_singleton(v.into_stream().count())
3526        }
3527        .send_bincode_external(&external);
3528
3529        let nodes = flow
3530            .with_process(&node, deployment.Localhost())
3531            .with_external(&external, deployment.Localhost())
3532            .deploy(&mut deployment);
3533
3534        deployment.deploy().await.unwrap();
3535
3536        let mut external_in = nodes.connect(input_port).await;
3537        let mut external_out = nodes.connect(out).await;
3538
3539        deployment.start().await.unwrap();
3540
3541        external_in.send(1).await.unwrap();
3542        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3543
3544        external_in.send(2).await.unwrap();
3545        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3546    }
3547
3548    #[cfg(feature = "deploy")]
3549    #[tokio::test]
3550    async fn atomic_fold_replays_each_tick() {
3551        let mut deployment = Deployment::new();
3552
3553        let mut flow = FlowBuilder::new();
3554        let node = flow.process::<()>();
3555        let external = flow.external::<()>();
3556
3557        let (input_port, input) =
3558            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3559        let tick = node.tick();
3560
3561        let out = input
3562            .batch(&tick, nondet!(/** test */))
3563            .cross_singleton(
3564                node.source_iter(q!(vec![1, 2, 3]))
3565                    .atomic()
3566                    .fold(q!(|| 0), q!(|acc, v| *acc += v))
3567                    .snapshot_atomic(&tick, nondet!(/** test */)),
3568            )
3569            .all_ticks()
3570            .send_bincode_external(&external);
3571
3572        let nodes = flow
3573            .with_process(&node, deployment.Localhost())
3574            .with_external(&external, deployment.Localhost())
3575            .deploy(&mut deployment);
3576
3577        deployment.deploy().await.unwrap();
3578
3579        let mut external_in = nodes.connect(input_port).await;
3580        let mut external_out = nodes.connect(out).await;
3581
3582        deployment.start().await.unwrap();
3583
3584        external_in.send(1).await.unwrap();
3585        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3586
3587        external_in.send(2).await.unwrap();
3588        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3589    }
3590
3591    #[cfg(feature = "deploy")]
3592    #[tokio::test]
3593    async fn unbounded_scan_remembers_state() {
3594        let mut deployment = Deployment::new();
3595
3596        let mut flow = FlowBuilder::new();
3597        let node = flow.process::<()>();
3598        let external = flow.external::<()>();
3599
3600        let (input_port, input) = node.source_external_bincode(&external);
3601        let out = input
3602            .scan(
3603                q!(|| 0),
3604                q!(|acc, v| {
3605                    *acc += v;
3606                    Some(*acc)
3607                }),
3608            )
3609            .send_bincode_external(&external);
3610
3611        let nodes = flow
3612            .with_process(&node, deployment.Localhost())
3613            .with_external(&external, deployment.Localhost())
3614            .deploy(&mut deployment);
3615
3616        deployment.deploy().await.unwrap();
3617
3618        let mut external_in = nodes.connect(input_port).await;
3619        let mut external_out = nodes.connect(out).await;
3620
3621        deployment.start().await.unwrap();
3622
3623        external_in.send(1).await.unwrap();
3624        assert_eq!(external_out.next().await.unwrap(), 1);
3625
3626        external_in.send(2).await.unwrap();
3627        assert_eq!(external_out.next().await.unwrap(), 3);
3628    }
3629
3630    #[cfg(feature = "deploy")]
3631    #[tokio::test]
3632    async fn unbounded_enumerate_remembers_state() {
3633        let mut deployment = Deployment::new();
3634
3635        let mut flow = FlowBuilder::new();
3636        let node = flow.process::<()>();
3637        let external = flow.external::<()>();
3638
3639        let (input_port, input) = node.source_external_bincode(&external);
3640        let out = input.enumerate().send_bincode_external(&external);
3641
3642        let nodes = flow
3643            .with_process(&node, deployment.Localhost())
3644            .with_external(&external, deployment.Localhost())
3645            .deploy(&mut deployment);
3646
3647        deployment.deploy().await.unwrap();
3648
3649        let mut external_in = nodes.connect(input_port).await;
3650        let mut external_out = nodes.connect(out).await;
3651
3652        deployment.start().await.unwrap();
3653
3654        external_in.send(1).await.unwrap();
3655        assert_eq!(external_out.next().await.unwrap(), (0, 1));
3656
3657        external_in.send(2).await.unwrap();
3658        assert_eq!(external_out.next().await.unwrap(), (1, 2));
3659    }
3660
3661    #[cfg(feature = "deploy")]
3662    #[tokio::test]
3663    async fn unbounded_unique_remembers_state() {
3664        let mut deployment = Deployment::new();
3665
3666        let mut flow = FlowBuilder::new();
3667        let node = flow.process::<()>();
3668        let external = flow.external::<()>();
3669
3670        let (input_port, input) =
3671            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3672        let out = input.unique().send_bincode_external(&external);
3673
3674        let nodes = flow
3675            .with_process(&node, deployment.Localhost())
3676            .with_external(&external, deployment.Localhost())
3677            .deploy(&mut deployment);
3678
3679        deployment.deploy().await.unwrap();
3680
3681        let mut external_in = nodes.connect(input_port).await;
3682        let mut external_out = nodes.connect(out).await;
3683
3684        deployment.start().await.unwrap();
3685
3686        external_in.send(1).await.unwrap();
3687        assert_eq!(external_out.next().await.unwrap(), 1);
3688
3689        external_in.send(2).await.unwrap();
3690        assert_eq!(external_out.next().await.unwrap(), 2);
3691
3692        external_in.send(1).await.unwrap();
3693        external_in.send(3).await.unwrap();
3694        assert_eq!(external_out.next().await.unwrap(), 3);
3695    }
3696
3697    #[cfg(feature = "sim")]
3698    #[test]
3699    #[should_panic]
3700    fn sim_batch_nondet_size() {
3701        let mut flow = FlowBuilder::new();
3702        let node = flow.process::<()>();
3703
3704        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3705
3706        let tick = node.tick();
3707        let out_recv = input
3708            .batch(&tick, nondet!(/** test */))
3709            .count()
3710            .all_ticks()
3711            .sim_output();
3712
3713        flow.sim().exhaustive(async || {
3714            in_send.send(());
3715            in_send.send(());
3716            in_send.send(());
3717
3718            assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3719        });
3720    }
3721
3722    #[cfg(feature = "sim")]
3723    #[test]
3724    fn sim_batch_preserves_order() {
3725        let mut flow = FlowBuilder::new();
3726        let node = flow.process::<()>();
3727
3728        let (in_send, input) = node.sim_input();
3729
3730        let tick = node.tick();
3731        let out_recv = input
3732            .batch(&tick, nondet!(/** test */))
3733            .all_ticks()
3734            .sim_output();
3735
3736        flow.sim().exhaustive(async || {
3737            in_send.send(1);
3738            in_send.send(2);
3739            in_send.send(3);
3740
3741            out_recv.assert_yields_only([1, 2, 3]).await;
3742        });
3743    }
3744
3745    #[cfg(feature = "sim")]
3746    #[test]
3747    #[should_panic]
3748    fn sim_batch_unordered_shuffles() {
3749        let mut flow = FlowBuilder::new();
3750        let node = flow.process::<()>();
3751
3752        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3753
3754        let tick = node.tick();
3755        let batch = input.batch(&tick, nondet!(/** test */));
3756        let out_recv = batch
3757            .clone()
3758            .min()
3759            .zip(batch.max())
3760            .all_ticks()
3761            .sim_output();
3762
3763        flow.sim().exhaustive(async || {
3764            in_send.send_many_unordered([1, 2, 3]);
3765
3766            if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3767                panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3768            }
3769        });
3770    }
3771
3772    #[cfg(feature = "sim")]
3773    #[test]
3774    fn sim_batch_unordered_shuffles_count() {
3775        let mut flow = FlowBuilder::new();
3776        let node = flow.process::<()>();
3777
3778        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3779
3780        let tick = node.tick();
3781        let batch = input.batch(&tick, nondet!(/** test */));
3782        let out_recv = batch.all_ticks().sim_output();
3783
3784        let instance_count = flow.sim().exhaustive(async || {
3785            in_send.send_many_unordered([1, 2, 3, 4]);
3786            out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3787        });
3788
3789        assert_eq!(
3790            instance_count,
3791            75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3792        )
3793    }
3794
3795    #[cfg(feature = "sim")]
3796    #[test]
3797    #[should_panic]
3798    fn sim_observe_order_batched() {
3799        let mut flow = FlowBuilder::new();
3800        let node = flow.process::<()>();
3801
3802        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3803
3804        let tick = node.tick();
3805        let batch = input.batch(&tick, nondet!(/** test */));
3806        let out_recv = batch
3807            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3808            .all_ticks()
3809            .sim_output();
3810
3811        flow.sim().exhaustive(async || {
3812            in_send.send_many_unordered([1, 2, 3, 4]);
3813            out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3814        });
3815    }
3816
3817    #[cfg(feature = "sim")]
3818    #[test]
3819    fn sim_observe_order_batched_count() {
3820        let mut flow = FlowBuilder::new();
3821        let node = flow.process::<()>();
3822
3823        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3824
3825        let tick = node.tick();
3826        let batch = input.batch(&tick, nondet!(/** test */));
3827        let out_recv = batch
3828            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3829            .all_ticks()
3830            .sim_output();
3831
3832        let instance_count = flow.sim().exhaustive(async || {
3833            in_send.send_many_unordered([1, 2, 3, 4]);
3834            let _ = out_recv.collect::<Vec<_>>().await;
3835        });
3836
3837        assert_eq!(
3838            instance_count,
3839            192 // 4! * 2^{4 - 1}
3840        )
3841    }
3842
3843    #[cfg(feature = "sim")]
3844    #[test]
3845    fn sim_unordered_count_instance_count() {
3846        let mut flow = FlowBuilder::new();
3847        let node = flow.process::<()>();
3848
3849        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3850
3851        let tick = node.tick();
3852        let out_recv = input
3853            .count()
3854            .snapshot(&tick, nondet!(/** test */))
3855            .all_ticks()
3856            .sim_output();
3857
3858        let instance_count = flow.sim().exhaustive(async || {
3859            in_send.send_many_unordered([1, 2, 3, 4]);
3860            assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3861        });
3862
3863        assert_eq!(
3864            instance_count,
3865            16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3866        )
3867    }
3868
3869    #[cfg(feature = "sim")]
3870    #[test]
3871    fn sim_top_level_assume_ordering() {
3872        let mut flow = FlowBuilder::new();
3873        let node = flow.process::<()>();
3874
3875        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3876
3877        let out_recv = input
3878            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3879            .sim_output();
3880
3881        let instance_count = flow.sim().exhaustive(async || {
3882            in_send.send_many_unordered([1, 2, 3]);
3883            let mut out = out_recv.collect::<Vec<_>>().await;
3884            out.sort();
3885            assert_eq!(out, vec![1, 2, 3]);
3886        });
3887
3888        assert_eq!(instance_count, 6)
3889    }
3890
3891    #[cfg(feature = "sim")]
3892    #[test]
3893    fn sim_top_level_assume_ordering_cycle_back() {
3894        let mut flow = FlowBuilder::new();
3895        let node = flow.process::<()>();
3896        let node2 = flow.process::<()>();
3897
3898        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3899
3900        let (complete_cycle_back, cycle_back) =
3901            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3902        let ordered = input
3903            .merge_unordered(cycle_back)
3904            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3905        complete_cycle_back.complete(
3906            ordered
3907                .clone()
3908                .map(q!(|v| v + 1))
3909                .filter(q!(|v| v % 2 == 1))
3910                .send(&node2, TCP.fail_stop().bincode())
3911                .send(&node, TCP.fail_stop().bincode()),
3912        );
3913
3914        let out_recv = ordered.sim_output();
3915
3916        let mut saw = false;
3917        let instance_count = flow.sim().exhaustive(async || {
3918            in_send.send_many_unordered([0, 2]);
3919            let out = out_recv.collect::<Vec<_>>().await;
3920
3921            if out.starts_with(&[0, 1, 2]) {
3922                saw = true;
3923            }
3924        });
3925
3926        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3927        assert_eq!(instance_count, 6);
3928    }
3929
3930    #[cfg(feature = "sim")]
3931    #[test]
3932    fn sim_top_level_assume_ordering_cycle_back_tick() {
3933        let mut flow = FlowBuilder::new();
3934        let node = flow.process::<()>();
3935        let node2 = flow.process::<()>();
3936
3937        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3938
3939        let (complete_cycle_back, cycle_back) =
3940            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3941        let ordered = input
3942            .merge_unordered(cycle_back)
3943            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3944        complete_cycle_back.complete(
3945            ordered
3946                .clone()
3947                .batch(&node.tick(), nondet!(/** test */))
3948                .all_ticks()
3949                .map(q!(|v| v + 1))
3950                .filter(q!(|v| v % 2 == 1))
3951                .send(&node2, TCP.fail_stop().bincode())
3952                .send(&node, TCP.fail_stop().bincode()),
3953        );
3954
3955        let out_recv = ordered.sim_output();
3956
3957        let mut saw = false;
3958        let instance_count = flow.sim().exhaustive(async || {
3959            in_send.send_many_unordered([0, 2]);
3960            let out = out_recv.collect::<Vec<_>>().await;
3961
3962            if out.starts_with(&[0, 1, 2]) {
3963                saw = true;
3964            }
3965        });
3966
3967        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3968        assert_eq!(instance_count, 58);
3969    }
3970
3971    #[cfg(feature = "sim")]
3972    #[test]
3973    fn sim_top_level_assume_ordering_multiple() {
3974        let mut flow = FlowBuilder::new();
3975        let node = flow.process::<()>();
3976        let node2 = flow.process::<()>();
3977
3978        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3979        let (_, input2) = node.sim_input::<_, NoOrder, _>();
3980
3981        let (complete_cycle_back, cycle_back) =
3982            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3983        let input1_ordered = input
3984            .clone()
3985            .merge_unordered(cycle_back)
3986            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3987        let foo = input1_ordered
3988            .clone()
3989            .map(q!(|v| v + 3))
3990            .weaken_ordering::<NoOrder>()
3991            .merge_unordered(input2)
3992            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3993
3994        complete_cycle_back.complete(
3995            foo.filter(q!(|v| *v == 3))
3996                .send(&node2, TCP.fail_stop().bincode())
3997                .send(&node, TCP.fail_stop().bincode()),
3998        );
3999
4000        let out_recv = input1_ordered.sim_output();
4001
4002        let mut saw = false;
4003        let instance_count = flow.sim().exhaustive(async || {
4004            in_send.send_many_unordered([0, 1]);
4005            let out = out_recv.collect::<Vec<_>>().await;
4006
4007            if out.starts_with(&[0, 3, 1]) {
4008                saw = true;
4009            }
4010        });
4011
4012        assert!(saw, "did not see an instance with 0, 3, 1 in order");
4013        assert_eq!(instance_count, 24);
4014    }
4015
4016    #[cfg(feature = "sim")]
4017    #[test]
4018    fn sim_atomic_assume_ordering_cycle_back() {
4019        let mut flow = FlowBuilder::new();
4020        let node = flow.process::<()>();
4021        let node2 = flow.process::<()>();
4022
4023        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
4024
4025        let (complete_cycle_back, cycle_back) =
4026            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
4027        let ordered = input
4028            .merge_unordered(cycle_back)
4029            .atomic()
4030            .assume_ordering::<TotalOrder>(nondet!(/** test */))
4031            .end_atomic();
4032        complete_cycle_back.complete(
4033            ordered
4034                .clone()
4035                .map(q!(|v| v + 1))
4036                .filter(q!(|v| v % 2 == 1))
4037                .send(&node2, TCP.fail_stop().bincode())
4038                .send(&node, TCP.fail_stop().bincode()),
4039        );
4040
4041        let out_recv = ordered.sim_output();
4042
4043        let instance_count = flow.sim().exhaustive(async || {
4044            in_send.send_many_unordered([0, 2]);
4045            let out = out_recv.collect::<Vec<_>>().await;
4046            assert_eq!(out.len(), 4);
4047        });
4048        assert_eq!(instance_count, 22);
4049    }
4050
4051    #[cfg(feature = "deploy")]
4052    #[tokio::test]
4053    async fn partition_evens_odds() {
4054        let mut deployment = Deployment::new();
4055
4056        let mut flow = FlowBuilder::new();
4057        let node = flow.process::<()>();
4058        let external = flow.external::<()>();
4059
4060        let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
4061        let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
4062        let evens_port = evens.send_bincode_external(&external);
4063        let odds_port = odds.send_bincode_external(&external);
4064
4065        let nodes = flow
4066            .with_process(&node, deployment.Localhost())
4067            .with_external(&external, deployment.Localhost())
4068            .deploy(&mut deployment);
4069
4070        deployment.deploy().await.unwrap();
4071
4072        let mut evens_out = nodes.connect(evens_port).await;
4073        let mut odds_out = nodes.connect(odds_port).await;
4074
4075        deployment.start().await.unwrap();
4076
4077        let mut even_results = Vec::new();
4078        for _ in 0..3 {
4079            even_results.push(evens_out.next().await.unwrap());
4080        }
4081        even_results.sort();
4082        assert_eq!(even_results, vec![2, 4, 6]);
4083
4084        let mut odd_results = Vec::new();
4085        for _ in 0..3 {
4086            odd_results.push(odds_out.next().await.unwrap());
4087        }
4088        odd_results.sort();
4089        assert_eq!(odd_results, vec![1, 3, 5]);
4090    }
4091
4092    #[cfg(feature = "deploy")]
4093    #[tokio::test]
4094    async fn unconsumed_inspect_still_runs() {
4095        use crate::deploy::DeployCrateWrapper;
4096
4097        let mut deployment = Deployment::new();
4098
4099        let mut flow = FlowBuilder::new();
4100        let node = flow.process::<()>();
4101
4102        // The return value of .inspect() is intentionally dropped.
4103        // Before the Null-root fix, this would silently do nothing.
4104        node.source_iter(q!(0..5))
4105            .inspect(q!(|x| println!("inspect: {}", x)));
4106
4107        let nodes = flow
4108            .with_process(&node, deployment.Localhost())
4109            .deploy(&mut deployment);
4110
4111        deployment.deploy().await.unwrap();
4112
4113        let mut stdout = nodes.get_process(&node).stdout();
4114
4115        deployment.start().await.unwrap();
4116
4117        let mut lines = Vec::new();
4118        for _ in 0..5 {
4119            lines.push(stdout.recv().await.unwrap());
4120        }
4121        lines.sort();
4122        assert_eq!(
4123            lines,
4124            vec![
4125                "inspect: 0",
4126                "inspect: 1",
4127                "inspect: 2",
4128                "inspect: 3",
4129                "inspect: 4",
4130            ]
4131        );
4132    }
4133
4134    #[cfg(feature = "sim")]
4135    #[test]
4136    fn sim_limit() {
4137        let mut flow = FlowBuilder::new();
4138        let node = flow.process::<()>();
4139
4140        let (in_send, input) = node.sim_input();
4141
4142        let out_recv = input.limit(q!(3)).sim_output();
4143
4144        flow.sim().exhaustive(async || {
4145            in_send.send(1);
4146            in_send.send(2);
4147            in_send.send(3);
4148            in_send.send(4);
4149            in_send.send(5);
4150
4151            out_recv.assert_yields_only([1, 2, 3]).await;
4152        });
4153    }
4154
4155    #[cfg(feature = "sim")]
4156    #[test]
4157    fn sim_limit_zero() {
4158        let mut flow = FlowBuilder::new();
4159        let node = flow.process::<()>();
4160
4161        let (in_send, input) = node.sim_input();
4162
4163        let out_recv = input.limit(q!(0)).sim_output();
4164
4165        flow.sim().exhaustive(async || {
4166            in_send.send(1);
4167            in_send.send(2);
4168
4169            out_recv.assert_yields_only::<i32, _>([]).await;
4170        });
4171    }
4172
4173    #[cfg(feature = "sim")]
4174    #[test]
4175    fn sim_merge_ordered() {
4176        let mut flow = FlowBuilder::new();
4177        let node = flow.process::<()>();
4178
4179        let (in_send, input) = node.sim_input();
4180        let (in_send2, input2) = node.sim_input();
4181
4182        let out_recv = input
4183            .merge_ordered(input2, nondet!(/** test */))
4184            .sim_output();
4185
4186        let mut saw_out_of_order = false;
4187        let instances = flow.sim().exhaustive(async || {
4188            in_send.send(1);
4189            in_send.send(2);
4190            in_send2.send(3);
4191            in_send2.send(4);
4192
4193            let out = out_recv.collect::<Vec<_>>().await;
4194
4195            if out == [1, 3, 2, 4] {
4196                saw_out_of_order = true;
4197            }
4198
4199            // Assert ordering preservation: elements from each input must
4200            // appear in their original relative order.
4201            let mut first_elements = out.iter().filter(|v| **v <= 2).copied().collect::<Vec<_>>();
4202            let mut second_elements = out.iter().filter(|v| **v > 2).copied().collect::<Vec<_>>();
4203            assert_eq!(
4204                first_elements,
4205                vec![1, 2],
4206                "first input order violated: {:?}",
4207                out
4208            );
4209            assert_eq!(
4210                second_elements,
4211                vec![3, 4],
4212                "second input order violated: {:?}",
4213                out
4214            );
4215
4216            first_elements.append(&mut second_elements);
4217            first_elements.sort();
4218            assert_eq!(first_elements, vec![1, 2, 3, 4]);
4219        });
4220
4221        assert!(saw_out_of_order);
4222        assert_eq!(instances, 6);
4223    }
4224
4225    /// Tests that merge_ordered passes through elements when only one input
4226    /// has data.
4227    #[cfg(feature = "sim")]
4228    #[test]
4229    fn sim_merge_ordered_one_empty() {
4230        let mut flow = FlowBuilder::new();
4231        let node = flow.process::<()>();
4232
4233        let (in_send, input) = node.sim_input();
4234        let (_in_send2, input2) = node.sim_input();
4235
4236        let out_recv = input
4237            .merge_ordered(input2, nondet!(/** test */))
4238            .sim_output();
4239
4240        let instances = flow.sim().exhaustive(async || {
4241            in_send.send(1);
4242            in_send.send(2);
4243
4244            let out = out_recv.collect::<Vec<_>>().await;
4245            assert_eq!(out, vec![1, 2]);
4246        });
4247
4248        // Only one possible interleaving when one input is empty
4249        assert_eq!(instances, 1);
4250    }
4251
4252    /// Tests that merge_ordered correctly handles feedback cycles.
4253    /// An element output from merge_ordered is filtered and cycled back to
4254    /// one of its inputs. The one-at-a-time release must allow the cycled-back
4255    /// element to arrive and potentially be emitted before elements still
4256    /// waiting on the other input.
4257    #[cfg(feature = "sim")]
4258    #[test]
4259    fn sim_merge_ordered_cycle_back() {
4260        let mut flow = FlowBuilder::new();
4261        let node = flow.process::<()>();
4262
4263        let (in_send, input) = node.sim_input();
4264
4265        // Create a forward ref for the cycle back
4266        let (complete_cycle_back, cycle_back) =
4267            node.forward_ref::<super::Stream<_, _, _, TotalOrder>>();
4268
4269        // merge_ordered: input (external) with cycle_back
4270        let merged = input.merge_ordered(cycle_back, nondet!(/** test */));
4271
4272        // Cycle back: elements equal to 1 get mapped to 10 and fed back
4273        complete_cycle_back.complete(merged.clone().filter(q!(|v| *v == 1)).map(q!(|v| v * 10)));
4274
4275        let out_recv = merged.sim_output();
4276
4277        // Send 1 and 2. Element 1 should cycle back as 10.
4278        // Valid orderings must have 1 before 10 (since 10 depends on 1).
4279        let mut saw_cycle_before_second = false;
4280        flow.sim().exhaustive(async || {
4281            in_send.send(1);
4282            in_send.send(2);
4283
4284            let out = out_recv.collect::<Vec<_>>().await;
4285
4286            // 10 must always come after 1 (causal dependency)
4287            let pos_1 = out.iter().position(|v| *v == 1).unwrap();
4288            let pos_10 = out.iter().position(|v| *v == 10).unwrap();
4289            assert!(pos_1 < pos_10, "causal order violated: {:?}", out);
4290
4291            // Check if we see [1, 10, 2] — the cycled element beats the second input
4292            if out == [1, 10, 2] {
4293                saw_cycle_before_second = true;
4294            }
4295
4296            let mut sorted = out;
4297            sorted.sort();
4298            assert_eq!(sorted, vec![1, 2, 10]);
4299        });
4300
4301        assert!(
4302            saw_cycle_before_second,
4303            "never saw the cycled element arrive before the second input element"
4304        );
4305    }
4306
4307    /// Tests that merge_ordered correctly interleaves when one input has a
4308    /// delayed element. With a: [1, _delay_, 2] and b: [3, 4], the delayed
4309    /// element 2 should be able to appear after b's elements.
4310    #[cfg(feature = "sim")]
4311    #[test]
4312    fn sim_merge_ordered_delayed() {
4313        let mut flow = FlowBuilder::new();
4314        let node = flow.process::<()>();
4315
4316        let (in_send, input) = node.sim_input();
4317        let (in_send2, input2) = node.sim_input();
4318
4319        let out_recv = input
4320            .merge_ordered(input2, nondet!(/** test */))
4321            .sim_output();
4322
4323        let mut saw_delayed_interleaving = false;
4324        flow.sim().exhaustive(async || {
4325            // Send 1 from a, and 3, 4 from b
4326            in_send.send(1);
4327            in_send2.send(3);
4328            in_send2.send(4);
4329
4330            // Collect what's available so far
4331            let first_batch = out_recv.collect::<Vec<_>>().await;
4332
4333            // Now send the delayed element 2 from a
4334            in_send.send(2);
4335            let second_batch = out_recv.collect::<Vec<_>>().await;
4336
4337            let mut all: Vec<_> = first_batch
4338                .iter()
4339                .chain(second_batch.iter())
4340                .copied()
4341                .collect();
4342
4343            // Check if we saw [1, 3, 4, 2] — the delayed interleaving
4344            if all == [1, 3, 4, 2] {
4345                saw_delayed_interleaving = true;
4346            }
4347
4348            all.sort();
4349            assert_eq!(all, vec![1, 2, 3, 4]);
4350        });
4351
4352        assert!(saw_delayed_interleaving);
4353    }
4354
4355    /// Deploy test: merge_ordered with a delayed element on one input.
4356    /// Sends a=1, b=3, b=4, then after receiving those, sends a=2.
4357    /// Expects to see [1, 3, 4] first, then [2] — demonstrating that
4358    /// both inputs are pulled and the delayed element arrives later.
4359    #[cfg(feature = "deploy")]
4360    #[tokio::test]
4361    async fn deploy_merge_ordered_delayed() {
4362        let mut deployment = Deployment::new();
4363
4364        let mut flow = FlowBuilder::new();
4365        let node = flow.process::<()>();
4366        let external = flow.external::<()>();
4367
4368        let (input_a_port, input_a) = node.source_external_bincode(&external);
4369        let (input_b_port, input_b) = node.source_external_bincode(&external);
4370
4371        let out = input_a
4372            .assume_ordering(nondet!(/** test */))
4373            .merge_ordered(
4374                input_b.assume_ordering(nondet!(/** test */)),
4375                nondet!(/** test */),
4376            )
4377            .send_bincode_external(&external);
4378
4379        let nodes = flow
4380            .with_process(&node, deployment.Localhost())
4381            .with_external(&external, deployment.Localhost())
4382            .deploy(&mut deployment);
4383
4384        deployment.deploy().await.unwrap();
4385
4386        let mut ext_a = nodes.connect(input_a_port).await;
4387        let mut ext_b = nodes.connect(input_b_port).await;
4388        let mut ext_out = nodes.connect(out).await;
4389
4390        deployment.start().await.unwrap();
4391
4392        // Send a=1, b=3, b=4
4393        ext_a.send(1).await.unwrap();
4394        ext_b.send(3).await.unwrap();
4395        ext_b.send(4).await.unwrap();
4396
4397        // Collect the first 3 elements
4398        let mut received = Vec::new();
4399        for _ in 0..3 {
4400            received.push(ext_out.next().await.unwrap());
4401        }
4402
4403        // Now send the delayed a=2
4404        ext_a.send(2).await.unwrap();
4405        received.push(ext_out.next().await.unwrap());
4406
4407        // All elements should be present
4408        received.sort();
4409        assert_eq!(received, vec![1, 2, 3, 4]);
4410    }
4411
4412    #[cfg(feature = "deploy")]
4413    #[tokio::test]
4414    async fn monotone_fold_threshold() {
4415        use crate::properties::manual_proof;
4416
4417        let mut deployment = Deployment::new();
4418
4419        let mut flow = FlowBuilder::new();
4420        let node = flow.process::<()>();
4421        let external = flow.external::<()>();
4422
4423        let in_unbounded: super::Stream<_, _> =
4424            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4425        let sum = in_unbounded.fold(
4426            q!(|| 0),
4427            q!(
4428                |sum, v| {
4429                    *sum += v;
4430                },
4431                monotone = manual_proof!(/** test */)
4432            ),
4433        );
4434
4435        let threshold_out = sum
4436            .threshold_greater_or_equal(node.singleton(q!(7)))
4437            .send_bincode_external(&external);
4438
4439        let nodes = flow
4440            .with_process(&node, deployment.Localhost())
4441            .with_external(&external, deployment.Localhost())
4442            .deploy(&mut deployment);
4443
4444        deployment.deploy().await.unwrap();
4445
4446        let mut threshold_out = nodes.connect(threshold_out).await;
4447
4448        deployment.start().await.unwrap();
4449
4450        assert_eq!(threshold_out.next().await.unwrap(), 7);
4451    }
4452
4453    #[cfg(feature = "deploy")]
4454    #[tokio::test]
4455    async fn monotone_count_threshold() {
4456        let mut deployment = Deployment::new();
4457
4458        let mut flow = FlowBuilder::new();
4459        let node = flow.process::<()>();
4460        let external = flow.external::<()>();
4461
4462        let in_unbounded: super::Stream<_, _> =
4463            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4464        let sum = in_unbounded.count();
4465
4466        let threshold_out = sum
4467            .threshold_greater_or_equal(node.singleton(q!(3)))
4468            .send_bincode_external(&external);
4469
4470        let nodes = flow
4471            .with_process(&node, deployment.Localhost())
4472            .with_external(&external, deployment.Localhost())
4473            .deploy(&mut deployment);
4474
4475        deployment.deploy().await.unwrap();
4476
4477        let mut threshold_out = nodes.connect(threshold_out).await;
4478
4479        deployment.start().await.unwrap();
4480
4481        assert_eq!(threshold_out.next().await.unwrap(), 3);
4482    }
4483
4484    #[cfg(feature = "deploy")]
4485    #[tokio::test]
4486    async fn monotone_map_order_preserving_threshold() {
4487        use crate::properties::manual_proof;
4488
4489        let mut deployment = Deployment::new();
4490
4491        let mut flow = FlowBuilder::new();
4492        let node = flow.process::<()>();
4493        let external = flow.external::<()>();
4494
4495        let in_unbounded: super::Stream<_, _> =
4496            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4497        let sum = in_unbounded.fold(
4498            q!(|| 0),
4499            q!(
4500                |sum, v| {
4501                    *sum += v;
4502                },
4503                monotone = manual_proof!(/** test */)
4504            ),
4505        );
4506
4507        // map with order_preserving should preserve monotonicity
4508        let doubled = sum.map(q!(
4509            |v| v * 2,
4510            order_preserving = manual_proof!(/** doubling preserves order */)
4511        ));
4512
4513        let threshold_out = doubled
4514            .threshold_greater_or_equal(node.singleton(q!(14)))
4515            .send_bincode_external(&external);
4516
4517        let nodes = flow
4518            .with_process(&node, deployment.Localhost())
4519            .with_external(&external, deployment.Localhost())
4520            .deploy(&mut deployment);
4521
4522        deployment.deploy().await.unwrap();
4523
4524        let mut threshold_out = nodes.connect(threshold_out).await;
4525
4526        deployment.start().await.unwrap();
4527
4528        assert_eq!(threshold_out.next().await.unwrap(), 14);
4529    }
4530
4531    // === Compile-time type tests for join/cross_product ordering ===
4532
4533    #[cfg(any(feature = "deploy", feature = "sim"))]
4534    mod join_ordering_type_tests {
4535        use crate::live_collections::boundedness::{Bounded, Unbounded};
4536        use crate::live_collections::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
4537        use crate::location::{Location, Process};
4538
4539        #[expect(dead_code, reason = "compile-time type test")]
4540        fn join_unbounded_with_bounded_preserves_order<'a>(
4541            left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4542            right: Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4543        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4544            left.join(right)
4545        }
4546
4547        #[expect(dead_code, reason = "compile-time type test")]
4548        fn join_unbounded_with_unbounded_is_no_order<'a>(
4549            left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4550            right: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4551        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4552            left.join(right)
4553        }
4554
4555        #[expect(dead_code, reason = "compile-time type test")]
4556        fn join_bounded_with_bounded_preserves_order<'a, L: Location<'a>>(
4557            left: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4558            right: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4559        ) -> Stream<(i32, (char, char)), L, Bounded, TotalOrder, ExactlyOnce> {
4560            left.join(right)
4561        }
4562
4563        #[expect(dead_code, reason = "compile-time type test")]
4564        fn join_unbounded_noorder_with_bounded<'a>(
4565            left: Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce>,
4566            right: Stream<(i32, char), Process<'a>, Bounded, NoOrder, ExactlyOnce>,
4567        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4568            left.join(right)
4569        }
4570
4571        // === Compile-time type tests for cross_product ordering ===
4572
4573        #[expect(dead_code, reason = "compile-time type test")]
4574        fn cross_product_unbounded_with_bounded_preserves_order<'a>(
4575            left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4576            right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4577        ) -> Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4578            left.cross_product(right)
4579        }
4580
4581        #[expect(dead_code, reason = "compile-time type test")]
4582        fn cross_product_bounded_with_bounded_preserves_order<'a>(
4583            left: Stream<i32, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4584            right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4585        ) -> Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce> {
4586            left.cross_product(right)
4587        }
4588
4589        #[expect(dead_code, reason = "compile-time type test")]
4590        fn cross_product_unbounded_with_unbounded_is_no_order<'a>(
4591            left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4592            right: Stream<char, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4593        ) -> Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4594            left.cross_product(right)
4595        }
4596    } // mod join_ordering_type_tests
4597
4598    // === Runtime correctness tests for bounded join/cross_product ===
4599
4600    #[cfg(feature = "sim")]
4601    #[test]
4602    fn cross_product_mixed_boundedness_correctness() {
4603        use stageleft::q;
4604
4605        use crate::compile::builder::FlowBuilder;
4606        use crate::nondet::nondet;
4607
4608        let mut flow = FlowBuilder::new();
4609        let process = flow.process::<()>();
4610        let tick = process.tick();
4611
4612        let left = process.source_iter(q!(vec![1, 2]));
4613        let right = process
4614            .source_iter(q!(vec!['a', 'b']))
4615            .batch(&tick, nondet!(/** test */))
4616            .all_ticks();
4617
4618        let out = left.cross_product(right).sim_output();
4619
4620        flow.sim().exhaustive(async || {
4621            out.assert_yields_only_unordered(vec![(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')])
4622                .await;
4623        });
4624    }
4625
4626    #[cfg(feature = "sim")]
4627    #[test]
4628    fn join_mixed_boundedness_correctness() {
4629        use stageleft::q;
4630
4631        use crate::compile::builder::FlowBuilder;
4632        use crate::nondet::nondet;
4633
4634        let mut flow = FlowBuilder::new();
4635        let process = flow.process::<()>();
4636        let tick = process.tick();
4637
4638        let left = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
4639        let right = process
4640            .source_iter(q!(vec![(1, 'x'), (2, 'y')]))
4641            .batch(&tick, nondet!(/** test */))
4642            .all_ticks();
4643
4644        let out = left.join(right).sim_output();
4645
4646        flow.sim().exhaustive(async || {
4647            out.assert_yields_only_unordered(vec![(1, ('a', 'x')), (2, ('b', 'y'))])
4648                .await;
4649        });
4650    }
4651
4652    #[cfg(feature = "sim")]
4653    #[test]
4654    fn sim_merge_unordered_independent_atomics() {
4655        let mut flow = FlowBuilder::new();
4656        let node = flow.process::<()>();
4657
4658        let (in1_send, input1) = node.sim_input::<_, TotalOrder, _>();
4659        let (in2_send, input2) = node.sim_input::<_, TotalOrder, _>();
4660
4661        let out = input1
4662            .atomic()
4663            .merge_unordered(input2.atomic())
4664            .end_atomic()
4665            .sim_output();
4666
4667        flow.sim().exhaustive(async || {
4668            in1_send.send(1);
4669            in2_send.send(2);
4670
4671            out.assert_yields_only_unordered(vec![1, 2]).await;
4672        });
4673    }
4674
4675    #[cfg(feature = "deploy")]
4676    #[tokio::test]
4677    async fn test_stream_ref() {
4678        let mut deployment = Deployment::new();
4679
4680        let mut flow = FlowBuilder::new();
4681        let external = flow.external::<()>();
4682        let p1 = flow.process::<()>();
4683
4684        // Create a bounded stream (source_iter is bounded within a tick)
4685        let my_stream = p1.source_iter(q!(1..=5i32));
4686
4687        let stream_ref = my_stream.by_ref();
4688
4689        // Use the stream ref to get the vec's length
4690        let out_port = p1
4691            .source_iter(q!([()]))
4692            .map(q!(|_| stream_ref.len() as i32))
4693            .send_bincode_external(&external);
4694
4695        // Also consume the stream via pipe
4696        my_stream.for_each(q!(|_| {}));
4697
4698        let nodes = flow
4699            .with_default_optimize()
4700            .with_process(&p1, deployment.Localhost())
4701            .with_external(&external, deployment.Localhost())
4702            .deploy(&mut deployment);
4703
4704        deployment.deploy().await.unwrap();
4705
4706        let mut out_recv = nodes.connect(out_port).await;
4707
4708        deployment.start().await.unwrap();
4709
4710        let result = out_recv.next().await.unwrap();
4711        // stream has 5 elements
4712        assert_eq!(result, 5);
4713    }
4714
4715    #[cfg(feature = "deploy")]
4716    #[tokio::test]
4717    async fn test_stream_ref_contents() {
4718        let mut deployment = Deployment::new();
4719
4720        let mut flow = FlowBuilder::new();
4721        let external = flow.external::<()>();
4722        let p1 = flow.process::<()>();
4723
4724        // Create a bounded stream
4725        let my_stream = p1.source_iter(q!(1..=3i32));
4726
4727        let stream_ref = my_stream.by_ref();
4728
4729        // Sum the referenced vec's contents
4730        let out_port = p1
4731            .source_iter(q!([()]))
4732            .map(q!(|_| stream_ref.iter().sum::<i32>()))
4733            .send_bincode_external(&external);
4734
4735        my_stream.for_each(q!(|_| {}));
4736
4737        let nodes = flow
4738            .with_default_optimize()
4739            .with_process(&p1, deployment.Localhost())
4740            .with_external(&external, deployment.Localhost())
4741            .deploy(&mut deployment);
4742
4743        deployment.deploy().await.unwrap();
4744
4745        let mut out_recv = nodes.connect(out_port).await;
4746
4747        deployment.start().await.unwrap();
4748
4749        let result = out_recv.next().await.unwrap();
4750        // sum of 1+2+3 = 6
4751        assert_eq!(result, 6);
4752    }
4753
4754    #[cfg(feature = "deploy")]
4755    #[tokio::test]
4756    async fn test_stream_ref_no_consumer() {
4757        let mut deployment = Deployment::new();
4758
4759        let mut flow = FlowBuilder::new();
4760        let external = flow.external::<()>();
4761        let p1 = flow.process::<()>();
4762
4763        // Create a bounded stream — no pipe consumer, only ref
4764        let my_stream = p1.source_iter(q!(1..=4i32));
4765
4766        let stream_ref = my_stream.by_ref();
4767
4768        let out_port = p1
4769            .source_iter(q!([()]))
4770            .map(q!(|_| stream_ref.len() as i32))
4771            .send_bincode_external(&external);
4772
4773        let nodes = flow
4774            .with_default_optimize()
4775            .with_process(&p1, deployment.Localhost())
4776            .with_external(&external, deployment.Localhost())
4777            .deploy(&mut deployment);
4778
4779        deployment.deploy().await.unwrap();
4780
4781        let mut out_recv = nodes.connect(out_port).await;
4782
4783        deployment.start().await.unwrap();
4784
4785        let result = out_recv.next().await.unwrap();
4786        assert_eq!(result, 4);
4787    }
4788
4789    #[cfg(feature = "deploy")]
4790    #[tokio::test]
4791    async fn test_stream_mut() {
4792        let mut deployment = Deployment::new();
4793
4794        let mut flow = FlowBuilder::new();
4795        let external = flow.external::<()>();
4796        let p1 = flow.process::<()>();
4797
4798        // Create a bounded stream
4799        let my_stream = p1.source_iter(q!(1..=5i32));
4800
4801        let stream_mut = my_stream.by_mut();
4802
4803        // Mutably reference the buffer to retain only items > 3
4804        let out_port = p1
4805            .source_iter(q!([()]))
4806            .map(q!(|_| {
4807                stream_mut.retain(|x| *x > 3);
4808                stream_mut.len() as i32
4809            }))
4810            .send_bincode_external(&external);
4811
4812        my_stream.for_each(q!(|_| {}));
4813
4814        let nodes = flow
4815            .with_default_optimize()
4816            .with_process(&p1, deployment.Localhost())
4817            .with_external(&external, deployment.Localhost())
4818            .deploy(&mut deployment);
4819
4820        deployment.deploy().await.unwrap();
4821
4822        let mut out_recv = nodes.connect(out_port).await;
4823
4824        deployment.start().await.unwrap();
4825
4826        let result = out_recv.next().await.unwrap();
4827        // After retain(> 3): [4, 5] => len = 2
4828        assert_eq!(result, 2);
4829    }
4830}