hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::KeyedStream;
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::ir::{
19    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
20};
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::forward_handle::{ForwardRef, TickCycle};
24use crate::live_collections::batch_atomic::BatchAtomic;
25#[cfg(stageleft_runtime)]
26use crate::location::dynamic::{DynLocation, LocationId};
27use crate::location::tick::{Atomic, DeferTick, NoAtomic};
28use crate::location::{Location, NoTick, Tick, check_matching_location};
29use crate::nondet::{NonDet, nondet};
30use crate::prelude::ManualProof;
31use crate::properties::{AggFuncAlgebra, ValidCommutativityFor, ValidIdempotenceFor};
32
33pub mod networking;
34
35/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
36#[sealed::sealed]
37pub trait Ordering:
38    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
39{
40    /// The [`StreamOrder`] corresponding to this type.
41    const ORDERING_KIND: StreamOrder;
42}
43
44/// Marks the stream as being totally ordered, which means that there are
45/// no sources of non-determinism (other than intentional ones) that will
46/// affect the order of elements.
47pub enum TotalOrder {}
48
49#[sealed::sealed]
50impl Ordering for TotalOrder {
51    const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
52}
53
54/// Marks the stream as having no order, which means that the order of
55/// elements may be affected by non-determinism.
56///
57/// This restricts certain operators, such as `fold` and `reduce`, to only
58/// be used with commutative aggregation functions.
59pub enum NoOrder {}
60
61#[sealed::sealed]
62impl Ordering for NoOrder {
63    const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
64}
65
66/// Helper trait for determining the weakest of two orderings.
67#[sealed::sealed]
68pub trait MinOrder<Other: ?Sized> {
69    /// The weaker of the two orderings.
70    type Min: Ordering;
71}
72
73#[sealed::sealed]
74impl MinOrder<NoOrder> for TotalOrder {
75    type Min = NoOrder;
76}
77
78#[sealed::sealed]
79impl MinOrder<TotalOrder> for TotalOrder {
80    type Min = TotalOrder;
81}
82
83#[sealed::sealed]
84impl MinOrder<TotalOrder> for NoOrder {
85    type Min = NoOrder;
86}
87
88#[sealed::sealed]
89impl MinOrder<NoOrder> for NoOrder {
90    type Min = NoOrder;
91}
92
93/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
94#[sealed::sealed]
95pub trait Retries:
96    MinRetries<Self, Min = Self>
97    + MinRetries<ExactlyOnce, Min = Self>
98    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
99{
100    /// The [`StreamRetry`] corresponding to this type.
101    const RETRIES_KIND: StreamRetry;
102}
103
104/// Marks the stream as having deterministic message cardinality, with no
105/// possibility of duplicates.
106pub enum ExactlyOnce {}
107
108#[sealed::sealed]
109impl Retries for ExactlyOnce {
110    const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
111}
112
113/// Marks the stream as having non-deterministic message cardinality, which
114/// means that duplicates may occur, but messages will not be dropped.
115pub enum AtLeastOnce {}
116
117#[sealed::sealed]
118impl Retries for AtLeastOnce {
119    const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
120}
121
122/// Helper trait for determining the weakest of two retry guarantees.
123#[sealed::sealed]
124pub trait MinRetries<Other: ?Sized> {
125    /// The weaker of the two retry guarantees.
126    type Min: Retries;
127}
128
129#[sealed::sealed]
130impl MinRetries<AtLeastOnce> for ExactlyOnce {
131    type Min = AtLeastOnce;
132}
133
134#[sealed::sealed]
135impl MinRetries<ExactlyOnce> for ExactlyOnce {
136    type Min = ExactlyOnce;
137}
138
139#[sealed::sealed]
140impl MinRetries<ExactlyOnce> for AtLeastOnce {
141    type Min = AtLeastOnce;
142}
143
144#[sealed::sealed]
145impl MinRetries<AtLeastOnce> for AtLeastOnce {
146    type Min = AtLeastOnce;
147}
148
149/// Streaming sequence of elements with type `Type`.
150///
151/// This live collection represents a growing sequence of elements, with new elements being
152/// asynchronously appended to the end of the sequence. This can be used to model the arrival
153/// of network input, such as API requests, or streaming ingestion.
154///
155/// By default, all streams have deterministic ordering and each element is materialized exactly
156/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
157/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
158/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
159///
160/// Type Parameters:
161/// - `Type`: the type of elements in the stream
162/// - `Loc`: the location where the stream is being materialized
163/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
164/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
165///   (default is [`TotalOrder`])
166/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
167///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
168pub struct Stream<
169    Type,
170    Loc,
171    Bound: Boundedness = Unbounded,
172    Order: Ordering = TotalOrder,
173    Retry: Retries = ExactlyOnce,
174> {
175    pub(crate) location: Loc,
176    pub(crate) ir_node: RefCell<HydroNode>,
177
178    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
179}
180
181impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
182    for Stream<T, L, Unbounded, O, R>
183where
184    L: Location<'a>,
185{
186    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
187        let new_meta = stream
188            .location
189            .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
190
191        Stream {
192            location: stream.location,
193            ir_node: RefCell::new(HydroNode::Cast {
194                inner: Box::new(stream.ir_node.into_inner()),
195                metadata: new_meta,
196            }),
197            _phantom: PhantomData,
198        }
199    }
200}
201
202impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
203    for Stream<T, L, B, NoOrder, R>
204where
205    L: Location<'a>,
206{
207    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
208        stream.weaken_ordering()
209    }
210}
211
212impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
213    for Stream<T, L, B, O, AtLeastOnce>
214where
215    L: Location<'a>,
216{
217    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
218        stream.weaken_retries()
219    }
220}
221
222impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
223where
224    L: Location<'a>,
225{
226    fn defer_tick(self) -> Self {
227        Stream::defer_tick(self)
228    }
229}
230
231impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
232    for Stream<T, Tick<L>, Bounded, O, R>
233where
234    L: Location<'a>,
235{
236    type Location = Tick<L>;
237
238    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
239        Stream::new(
240            location.clone(),
241            HydroNode::CycleSource {
242                ident,
243                metadata: location.new_node_metadata(Self::collection_kind()),
244            },
245        )
246    }
247}
248
249impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
250    for Stream<T, Tick<L>, Bounded, O, R>
251where
252    L: Location<'a>,
253{
254    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
255        assert_eq!(
256            Location::id(&self.location),
257            expected_location,
258            "locations do not match"
259        );
260        self.location
261            .flow_state()
262            .borrow_mut()
263            .push_root(HydroRoot::CycleSink {
264                ident,
265                input: Box::new(self.ir_node.into_inner()),
266                op_metadata: HydroIrOpMetadata::new(),
267            });
268    }
269}
270
271impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
272    for Stream<T, L, B, O, R>
273where
274    L: Location<'a> + NoTick,
275{
276    type Location = L;
277
278    fn create_source(ident: syn::Ident, location: L) -> Self {
279        Stream::new(
280            location.clone(),
281            HydroNode::CycleSource {
282                ident,
283                metadata: location.new_node_metadata(Self::collection_kind()),
284            },
285        )
286    }
287}
288
289impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
290    for Stream<T, L, B, O, R>
291where
292    L: Location<'a> + NoTick,
293{
294    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
295        assert_eq!(
296            Location::id(&self.location),
297            expected_location,
298            "locations do not match"
299        );
300        self.location
301            .flow_state()
302            .borrow_mut()
303            .push_root(HydroRoot::CycleSink {
304                ident,
305                input: Box::new(self.ir_node.into_inner()),
306                op_metadata: HydroIrOpMetadata::new(),
307            });
308    }
309}
310
311impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
312where
313    T: Clone,
314    L: Location<'a>,
315{
316    fn clone(&self) -> Self {
317        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
318            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
319            *self.ir_node.borrow_mut() = HydroNode::Tee {
320                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
321                metadata: self.location.new_node_metadata(Self::collection_kind()),
322            };
323        }
324
325        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
326            Stream {
327                location: self.location.clone(),
328                ir_node: HydroNode::Tee {
329                    inner: TeeNode(inner.0.clone()),
330                    metadata: metadata.clone(),
331                }
332                .into(),
333                _phantom: PhantomData,
334            }
335        } else {
336            unreachable!()
337        }
338    }
339}
340
341impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
342where
343    L: Location<'a>,
344{
345    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
346        debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
347        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
348
349        Stream {
350            location,
351            ir_node: RefCell::new(ir_node),
352            _phantom: PhantomData,
353        }
354    }
355
356    /// Returns the [`Location`] where this stream is being materialized.
357    pub fn location(&self) -> &L {
358        &self.location
359    }
360
361    pub(crate) fn collection_kind() -> CollectionKind {
362        CollectionKind::Stream {
363            bound: B::BOUND_KIND,
364            order: O::ORDERING_KIND,
365            retry: R::RETRIES_KIND,
366            element_type: quote_type::<T>().into(),
367        }
368    }
369
370    /// Produces a stream based on invoking `f` on each element.
371    /// If you do not want to modify the stream and instead only want to view
372    /// each item use [`Stream::inspect`] instead.
373    ///
374    /// # Example
375    /// ```rust
376    /// # #[cfg(feature = "deploy")] {
377    /// # use hydro_lang::prelude::*;
378    /// # use futures::StreamExt;
379    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
380    /// let words = process.source_iter(q!(vec!["hello", "world"]));
381    /// words.map(q!(|x| x.to_uppercase()))
382    /// # }, |mut stream| async move {
383    /// # for w in vec!["HELLO", "WORLD"] {
384    /// #     assert_eq!(stream.next().await.unwrap(), w);
385    /// # }
386    /// # }));
387    /// # }
388    /// ```
389    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
390    where
391        F: Fn(T) -> U + 'a,
392    {
393        let f = f.splice_fn1_ctx(&self.location).into();
394        Stream::new(
395            self.location.clone(),
396            HydroNode::Map {
397                f,
398                input: Box::new(self.ir_node.into_inner()),
399                metadata: self
400                    .location
401                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
402            },
403        )
404    }
405
406    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
407    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
408    /// for the output type `U` must produce items in a **deterministic** order.
409    ///
410    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
411    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
412    ///
413    /// # Example
414    /// ```rust
415    /// # #[cfg(feature = "deploy")] {
416    /// # use hydro_lang::prelude::*;
417    /// # use futures::StreamExt;
418    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
419    /// process
420    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
421    ///     .flat_map_ordered(q!(|x| x))
422    /// # }, |mut stream| async move {
423    /// // 1, 2, 3, 4
424    /// # for w in (1..5) {
425    /// #     assert_eq!(stream.next().await.unwrap(), w);
426    /// # }
427    /// # }));
428    /// # }
429    /// ```
430    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
431    where
432        I: IntoIterator<Item = U>,
433        F: Fn(T) -> I + 'a,
434    {
435        let f = f.splice_fn1_ctx(&self.location).into();
436        Stream::new(
437            self.location.clone(),
438            HydroNode::FlatMap {
439                f,
440                input: Box::new(self.ir_node.into_inner()),
441                metadata: self
442                    .location
443                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
444            },
445        )
446    }
447
448    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
449    /// for the output type `U` to produce items in any order.
450    ///
451    /// # Example
452    /// ```rust
453    /// # #[cfg(feature = "deploy")] {
454    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
455    /// # use futures::StreamExt;
456    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
457    /// process
458    ///     .source_iter(q!(vec![
459    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
460    ///         std::collections::HashSet::from_iter(vec![3, 4]),
461    ///     ]))
462    ///     .flat_map_unordered(q!(|x| x))
463    /// # }, |mut stream| async move {
464    /// // 1, 2, 3, 4, but in no particular order
465    /// # let mut results = Vec::new();
466    /// # for w in (1..5) {
467    /// #     results.push(stream.next().await.unwrap());
468    /// # }
469    /// # results.sort();
470    /// # assert_eq!(results, vec![1, 2, 3, 4]);
471    /// # }));
472    /// # }
473    /// ```
474    pub fn flat_map_unordered<U, I, F>(
475        self,
476        f: impl IntoQuotedMut<'a, F, L>,
477    ) -> Stream<U, L, B, NoOrder, R>
478    where
479        I: IntoIterator<Item = U>,
480        F: Fn(T) -> I + 'a,
481    {
482        let f = f.splice_fn1_ctx(&self.location).into();
483        Stream::new(
484            self.location.clone(),
485            HydroNode::FlatMap {
486                f,
487                input: Box::new(self.ir_node.into_inner()),
488                metadata: self
489                    .location
490                    .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
491            },
492        )
493    }
494
495    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
496    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
497    ///
498    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
499    /// not deterministic, use [`Stream::flatten_unordered`] instead.
500    ///
501    /// ```rust
502    /// # #[cfg(feature = "deploy")] {
503    /// # use hydro_lang::prelude::*;
504    /// # use futures::StreamExt;
505    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
506    /// process
507    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
508    ///     .flatten_ordered()
509    /// # }, |mut stream| async move {
510    /// // 1, 2, 3, 4
511    /// # for w in (1..5) {
512    /// #     assert_eq!(stream.next().await.unwrap(), w);
513    /// # }
514    /// # }));
515    /// # }
516    /// ```
517    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
518    where
519        T: IntoIterator<Item = U>,
520    {
521        self.flat_map_ordered(q!(|d| d))
522    }
523
524    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
525    /// for the element type `T` to produce items in any order.
526    ///
527    /// # Example
528    /// ```rust
529    /// # #[cfg(feature = "deploy")] {
530    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
531    /// # use futures::StreamExt;
532    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
533    /// process
534    ///     .source_iter(q!(vec![
535    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
536    ///         std::collections::HashSet::from_iter(vec![3, 4]),
537    ///     ]))
538    ///     .flatten_unordered()
539    /// # }, |mut stream| async move {
540    /// // 1, 2, 3, 4, but in no particular order
541    /// # let mut results = Vec::new();
542    /// # for w in (1..5) {
543    /// #     results.push(stream.next().await.unwrap());
544    /// # }
545    /// # results.sort();
546    /// # assert_eq!(results, vec![1, 2, 3, 4]);
547    /// # }));
548    /// # }
549    /// ```
550    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
551    where
552        T: IntoIterator<Item = U>,
553    {
554        self.flat_map_unordered(q!(|d| d))
555    }
556
557    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
558    /// `f`, preserving the order of the elements.
559    ///
560    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
561    /// not modify or take ownership of the values. If you need to modify the values while filtering
562    /// use [`Stream::filter_map`] instead.
563    ///
564    /// # Example
565    /// ```rust
566    /// # #[cfg(feature = "deploy")] {
567    /// # use hydro_lang::prelude::*;
568    /// # use futures::StreamExt;
569    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
570    /// process
571    ///     .source_iter(q!(vec![1, 2, 3, 4]))
572    ///     .filter(q!(|&x| x > 2))
573    /// # }, |mut stream| async move {
574    /// // 3, 4
575    /// # for w in (3..5) {
576    /// #     assert_eq!(stream.next().await.unwrap(), w);
577    /// # }
578    /// # }));
579    /// # }
580    /// ```
581    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
582    where
583        F: Fn(&T) -> bool + 'a,
584    {
585        let f = f.splice_fn1_borrow_ctx(&self.location).into();
586        Stream::new(
587            self.location.clone(),
588            HydroNode::Filter {
589                f,
590                input: Box::new(self.ir_node.into_inner()),
591                metadata: self.location.new_node_metadata(Self::collection_kind()),
592            },
593        )
594    }
595
596    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
597    ///
598    /// # Example
599    /// ```rust
600    /// # #[cfg(feature = "deploy")] {
601    /// # use hydro_lang::prelude::*;
602    /// # use futures::StreamExt;
603    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
604    /// process
605    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
606    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
607    /// # }, |mut stream| async move {
608    /// // 1, 2
609    /// # for w in (1..3) {
610    /// #     assert_eq!(stream.next().await.unwrap(), w);
611    /// # }
612    /// # }));
613    /// # }
614    /// ```
615    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
616    where
617        F: Fn(T) -> Option<U> + 'a,
618    {
619        let f = f.splice_fn1_ctx(&self.location).into();
620        Stream::new(
621            self.location.clone(),
622            HydroNode::FilterMap {
623                f,
624                input: Box::new(self.ir_node.into_inner()),
625                metadata: self
626                    .location
627                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
628            },
629        )
630    }
631
632    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
633    /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
634    /// If `other` is an empty [`Optional`], no values will be produced.
635    ///
636    /// # Example
637    /// ```rust
638    /// # #[cfg(feature = "deploy")] {
639    /// # use hydro_lang::prelude::*;
640    /// # use futures::StreamExt;
641    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
642    /// let tick = process.tick();
643    /// let batch = process
644    ///   .source_iter(q!(vec![1, 2, 3, 4]))
645    ///   .batch(&tick, nondet!(/** test */));
646    /// let count = batch.clone().count(); // `count()` returns a singleton
647    /// batch.cross_singleton(count).all_ticks()
648    /// # }, |mut stream| async move {
649    /// // (1, 4), (2, 4), (3, 4), (4, 4)
650    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
651    /// #     assert_eq!(stream.next().await.unwrap(), w);
652    /// # }
653    /// # }));
654    /// # }
655    /// ```
656    pub fn cross_singleton<O2>(
657        self,
658        other: impl Into<Optional<O2, L, Bounded>>,
659    ) -> Stream<(T, O2), L, B, O, R>
660    where
661        O2: Clone,
662    {
663        let other: Optional<O2, L, Bounded> = other.into();
664        check_matching_location(&self.location, &other.location);
665
666        Stream::new(
667            self.location.clone(),
668            HydroNode::CrossSingleton {
669                left: Box::new(self.ir_node.into_inner()),
670                right: Box::new(other.ir_node.into_inner()),
671                metadata: self
672                    .location
673                    .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
674            },
675        )
676    }
677
678    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
679    ///
680    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
681    /// leader of a cluster.
682    ///
683    /// # Example
684    /// ```rust
685    /// # #[cfg(feature = "deploy")] {
686    /// # use hydro_lang::prelude::*;
687    /// # use futures::StreamExt;
688    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
689    /// let tick = process.tick();
690    /// // ticks are lazy by default, forces the second tick to run
691    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
692    ///
693    /// let batch_first_tick = process
694    ///   .source_iter(q!(vec![1, 2, 3, 4]))
695    ///   .batch(&tick, nondet!(/** test */));
696    /// let batch_second_tick = process
697    ///   .source_iter(q!(vec![5, 6, 7, 8]))
698    ///   .batch(&tick, nondet!(/** test */))
699    ///   .defer_tick(); // appears on the second tick
700    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
701    /// batch_first_tick.chain(batch_second_tick)
702    ///   .filter_if_some(some_on_first_tick)
703    ///   .all_ticks()
704    /// # }, |mut stream| async move {
705    /// // [1, 2, 3, 4]
706    /// # for w in vec![1, 2, 3, 4] {
707    /// #     assert_eq!(stream.next().await.unwrap(), w);
708    /// # }
709    /// # }));
710    /// # }
711    /// ```
712    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
713        self.cross_singleton(signal.map(q!(|_u| ())))
714            .map(q!(|(d, _signal)| d))
715    }
716
717    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
718    ///
719    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
720    /// some local state.
721    ///
722    /// # Example
723    /// ```rust
724    /// # #[cfg(feature = "deploy")] {
725    /// # use hydro_lang::prelude::*;
726    /// # use futures::StreamExt;
727    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
728    /// let tick = process.tick();
729    /// // ticks are lazy by default, forces the second tick to run
730    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
731    ///
732    /// let batch_first_tick = process
733    ///   .source_iter(q!(vec![1, 2, 3, 4]))
734    ///   .batch(&tick, nondet!(/** test */));
735    /// let batch_second_tick = process
736    ///   .source_iter(q!(vec![5, 6, 7, 8]))
737    ///   .batch(&tick, nondet!(/** test */))
738    ///   .defer_tick(); // appears on the second tick
739    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
740    /// batch_first_tick.chain(batch_second_tick)
741    ///   .filter_if_none(some_on_first_tick)
742    ///   .all_ticks()
743    /// # }, |mut stream| async move {
744    /// // [5, 6, 7, 8]
745    /// # for w in vec![5, 6, 7, 8] {
746    /// #     assert_eq!(stream.next().await.unwrap(), w);
747    /// # }
748    /// # }));
749    /// # }
750    /// ```
751    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
752        self.filter_if_some(
753            other
754                .map(q!(|_| ()))
755                .into_singleton()
756                .filter(q!(|o| o.is_none())),
757        )
758    }
759
760    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
761    /// tupled pairs in a non-deterministic order.
762    ///
763    /// # Example
764    /// ```rust
765    /// # #[cfg(feature = "deploy")] {
766    /// # use hydro_lang::prelude::*;
767    /// # use std::collections::HashSet;
768    /// # use futures::StreamExt;
769    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
770    /// let tick = process.tick();
771    /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
772    /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
773    /// stream1.cross_product(stream2)
774    /// # }, |mut stream| async move {
775    /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
776    /// # stream.map(|i| assert!(expected.contains(&i)));
777    /// # }));
778    /// # }
779    /// ```
780    pub fn cross_product<T2, O2: Ordering>(
781        self,
782        other: Stream<T2, L, B, O2, R>,
783    ) -> Stream<(T, T2), L, B, NoOrder, R>
784    where
785        T: Clone,
786        T2: Clone,
787    {
788        check_matching_location(&self.location, &other.location);
789
790        Stream::new(
791            self.location.clone(),
792            HydroNode::CrossProduct {
793                left: Box::new(self.ir_node.into_inner()),
794                right: Box::new(other.ir_node.into_inner()),
795                metadata: self
796                    .location
797                    .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
798            },
799        )
800    }
801
802    /// Takes one stream as input and filters out any duplicate occurrences. The output
803    /// contains all unique values from the input.
804    ///
805    /// # Example
806    /// ```rust
807    /// # #[cfg(feature = "deploy")] {
808    /// # use hydro_lang::prelude::*;
809    /// # use futures::StreamExt;
810    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
811    /// let tick = process.tick();
812    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
813    /// # }, |mut stream| async move {
814    /// # for w in vec![1, 2, 3, 4] {
815    /// #     assert_eq!(stream.next().await.unwrap(), w);
816    /// # }
817    /// # }));
818    /// # }
819    /// ```
820    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
821    where
822        T: Eq + Hash,
823    {
824        Stream::new(
825            self.location.clone(),
826            HydroNode::Unique {
827                input: Box::new(self.ir_node.into_inner()),
828                metadata: self
829                    .location
830                    .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
831            },
832        )
833    }
834
835    /// Outputs everything in this stream that is *not* contained in the `other` stream.
836    ///
837    /// The `other` stream must be [`Bounded`], since this function will wait until
838    /// all its elements are available before producing any output.
839    /// # Example
840    /// ```rust
841    /// # #[cfg(feature = "deploy")] {
842    /// # use hydro_lang::prelude::*;
843    /// # use futures::StreamExt;
844    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
845    /// let tick = process.tick();
846    /// let stream = process
847    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
848    ///   .batch(&tick, nondet!(/** test */));
849    /// let batch = process
850    ///   .source_iter(q!(vec![1, 2]))
851    ///   .batch(&tick, nondet!(/** test */));
852    /// stream.filter_not_in(batch).all_ticks()
853    /// # }, |mut stream| async move {
854    /// # for w in vec![3, 4] {
855    /// #     assert_eq!(stream.next().await.unwrap(), w);
856    /// # }
857    /// # }));
858    /// # }
859    /// ```
860    pub fn filter_not_in<O2: Ordering>(self, other: Stream<T, L, Bounded, O2, R>) -> Self
861    where
862        T: Eq + Hash,
863    {
864        check_matching_location(&self.location, &other.location);
865
866        Stream::new(
867            self.location.clone(),
868            HydroNode::Difference {
869                pos: Box::new(self.ir_node.into_inner()),
870                neg: Box::new(other.ir_node.into_inner()),
871                metadata: self
872                    .location
873                    .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
874            },
875        )
876    }
877
878    /// An operator which allows you to "inspect" each element of a stream without
879    /// modifying it. The closure `f` is called on a reference to each item. This is
880    /// mainly useful for debugging, and should not be used to generate side-effects.
881    ///
882    /// # Example
883    /// ```rust
884    /// # #[cfg(feature = "deploy")] {
885    /// # use hydro_lang::prelude::*;
886    /// # use futures::StreamExt;
887    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
888    /// let nums = process.source_iter(q!(vec![1, 2]));
889    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
890    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
891    /// # }, |mut stream| async move {
892    /// # for w in vec![1, 2] {
893    /// #     assert_eq!(stream.next().await.unwrap(), w);
894    /// # }
895    /// # }));
896    /// # }
897    /// ```
898    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
899    where
900        F: Fn(&T) + 'a,
901    {
902        let f = f.splice_fn1_borrow_ctx(&self.location).into();
903
904        Stream::new(
905            self.location.clone(),
906            HydroNode::Inspect {
907                f,
908                input: Box::new(self.ir_node.into_inner()),
909                metadata: self.location.new_node_metadata(Self::collection_kind()),
910            },
911        )
912    }
913
914    /// An operator which allows you to "name" a `HydroNode`.
915    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
916    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
917        {
918            let mut node = self.ir_node.borrow_mut();
919            let metadata = node.metadata_mut();
920            metadata.tag = Some(name.to_string());
921        }
922        self
923    }
924
925    /// Explicitly "casts" the stream to a type with a different ordering
926    /// guarantee. Useful in unsafe code where the ordering cannot be proven
927    /// by the type-system.
928    ///
929    /// # Non-Determinism
930    /// This function is used as an escape hatch, and any mistakes in the
931    /// provided ordering guarantee will propagate into the guarantees
932    /// for the rest of the program.
933    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
934        if O::ORDERING_KIND == O2::ORDERING_KIND {
935            Stream::new(self.location, self.ir_node.into_inner())
936        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
937            // We can always weaken the ordering guarantee
938            Stream::new(
939                self.location.clone(),
940                HydroNode::Cast {
941                    inner: Box::new(self.ir_node.into_inner()),
942                    metadata: self
943                        .location
944                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
945                },
946            )
947        } else {
948            Stream::new(
949                self.location.clone(),
950                HydroNode::ObserveNonDet {
951                    inner: Box::new(self.ir_node.into_inner()),
952                    trusted: false,
953                    metadata: self
954                        .location
955                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
956                },
957            )
958        }
959    }
960
961    // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
962    // intermediate states will not be revealed
963    fn assume_ordering_trusted_bounded<O2: Ordering>(
964        self,
965        nondet: NonDet,
966    ) -> Stream<T, L, B, O2, R> {
967        if B::BOUNDED {
968            self.assume_ordering_trusted(nondet)
969        } else {
970            self.assume_ordering(nondet)
971        }
972    }
973
974    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
975    // is not observable
976    pub(crate) fn assume_ordering_trusted<O2: Ordering>(
977        self,
978        _nondet: NonDet,
979    ) -> Stream<T, L, B, O2, R> {
980        if O::ORDERING_KIND == O2::ORDERING_KIND {
981            Stream::new(self.location, self.ir_node.into_inner())
982        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
983            // We can always weaken the ordering guarantee
984            Stream::new(
985                self.location.clone(),
986                HydroNode::Cast {
987                    inner: Box::new(self.ir_node.into_inner()),
988                    metadata: self
989                        .location
990                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
991                },
992            )
993        } else {
994            Stream::new(
995                self.location.clone(),
996                HydroNode::ObserveNonDet {
997                    inner: Box::new(self.ir_node.into_inner()),
998                    trusted: true,
999                    metadata: self
1000                        .location
1001                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1002                },
1003            )
1004        }
1005    }
1006
1007    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
1008    /// which is always safe because that is the weakest possible guarantee.
1009    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
1010        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
1011        self.assume_ordering::<NoOrder>(nondet)
1012    }
1013
1014    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
1015    /// enforcing that `O2` is weaker than the input ordering guarantee.
1016    pub fn weaken_ordering<O2: Ordering + MinOrder<O, Min = O2>>(self) -> Stream<T, L, B, O2, R> {
1017        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
1018        self.assume_ordering::<O2>(nondet)
1019    }
1020
1021    /// Explicitly "casts" the stream to a type with a different retries
1022    /// guarantee. Useful in unsafe code where the lack of retries cannot
1023    /// be proven by the type-system.
1024    ///
1025    /// # Non-Determinism
1026    /// This function is used as an escape hatch, and any mistakes in the
1027    /// provided retries guarantee will propagate into the guarantees
1028    /// for the rest of the program.
1029    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1030        if R::RETRIES_KIND == R2::RETRIES_KIND {
1031            Stream::new(self.location, self.ir_node.into_inner())
1032        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1033            // We can always weaken the retries guarantee
1034            Stream::new(
1035                self.location.clone(),
1036                HydroNode::Cast {
1037                    inner: Box::new(self.ir_node.into_inner()),
1038                    metadata: self
1039                        .location
1040                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1041                },
1042            )
1043        } else {
1044            Stream::new(
1045                self.location.clone(),
1046                HydroNode::ObserveNonDet {
1047                    inner: Box::new(self.ir_node.into_inner()),
1048                    trusted: false,
1049                    metadata: self
1050                        .location
1051                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1052                },
1053            )
1054        }
1055    }
1056
1057    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1058    // is not observable
1059    fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1060        if R::RETRIES_KIND == R2::RETRIES_KIND {
1061            Stream::new(self.location, self.ir_node.into_inner())
1062        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1063            // We can always weaken the retries guarantee
1064            Stream::new(
1065                self.location.clone(),
1066                HydroNode::Cast {
1067                    inner: Box::new(self.ir_node.into_inner()),
1068                    metadata: self
1069                        .location
1070                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1071                },
1072            )
1073        } else {
1074            Stream::new(
1075                self.location.clone(),
1076                HydroNode::ObserveNonDet {
1077                    inner: Box::new(self.ir_node.into_inner()),
1078                    trusted: true,
1079                    metadata: self
1080                        .location
1081                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1082                },
1083            )
1084        }
1085    }
1086
1087    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
1088    /// which is always safe because that is the weakest possible guarantee.
1089    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
1090        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1091        self.assume_retries::<AtLeastOnce>(nondet)
1092    }
1093
1094    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
1095    /// enforcing that `R2` is weaker than the input retries guarantee.
1096    pub fn weaken_retries<R2: Retries + MinRetries<R, Min = R2>>(self) -> Stream<T, L, B, O, R2> {
1097        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1098        self.assume_retries::<R2>(nondet)
1099    }
1100}
1101
1102impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1103where
1104    L: Location<'a>,
1105{
1106    /// Given a stream with [`ExactlyOnce`] retry guarantees, weakens it to an arbitrary guarantee
1107    /// `R2`, which is safe because all guarantees are equal to or weaker than [`ExactlyOnce`]
1108    pub fn weaker_retries<R2: Retries>(self) -> Stream<T, L, B, O, R2> {
1109        self.assume_retries(
1110            nondet!(/** any retry ordering is the same or weaker than ExactlyOnce */),
1111        )
1112    }
1113}
1114
1115impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
1116where
1117    L: Location<'a>,
1118{
1119    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
1120    ///
1121    /// # Example
1122    /// ```rust
1123    /// # #[cfg(feature = "deploy")] {
1124    /// # use hydro_lang::prelude::*;
1125    /// # use futures::StreamExt;
1126    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1127    /// process.source_iter(q!(&[1, 2, 3])).cloned()
1128    /// # }, |mut stream| async move {
1129    /// // 1, 2, 3
1130    /// # for w in vec![1, 2, 3] {
1131    /// #     assert_eq!(stream.next().await.unwrap(), w);
1132    /// # }
1133    /// # }));
1134    /// # }
1135    /// ```
1136    pub fn cloned(self) -> Stream<T, L, B, O, R>
1137    where
1138        T: Clone,
1139    {
1140        self.map(q!(|d| d.clone()))
1141    }
1142}
1143
1144impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
1145where
1146    L: Location<'a>,
1147{
1148    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1149    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1150    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1151    ///
1152    /// Depending on the input stream guarantees, the closure may need to be commutative
1153    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1154    ///
1155    /// # Example
1156    /// ```rust
1157    /// # #[cfg(feature = "deploy")] {
1158    /// # use hydro_lang::prelude::*;
1159    /// # use futures::StreamExt;
1160    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1161    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1162    /// words
1163    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1164    ///     .into_stream()
1165    /// # }, |mut stream| async move {
1166    /// // "HELLOWORLD"
1167    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1168    /// # }));
1169    /// # }
1170    /// ```
1171    pub fn fold<A, I, F, C, Idemp>(
1172        self,
1173        init: impl IntoQuotedMut<'a, I, L>,
1174        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1175    ) -> Singleton<A, L, B>
1176    where
1177        I: Fn() -> A + 'a,
1178        F: Fn(&mut A, T),
1179        C: ValidCommutativityFor<O>,
1180        Idemp: ValidIdempotenceFor<R>,
1181    {
1182        let init = init.splice_fn0_ctx(&self.location).into();
1183        let comb = comb
1184            .splice_fn2_borrow_mut_ctx_props(&self.location)
1185            .0
1186            .into();
1187
1188        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1189        let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1190
1191        let core = HydroNode::Fold {
1192            init,
1193            acc: comb,
1194            input: Box::new(ordered_etc.ir_node.into_inner()),
1195            metadata: ordered_etc
1196                .location
1197                .new_node_metadata(Singleton::<A, L, B>::collection_kind()),
1198        };
1199
1200        Singleton::new(ordered_etc.location, core)
1201    }
1202
1203    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1204    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1205    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1206    /// reference, so that it can be modified in place.
1207    ///
1208    /// Depending on the input stream guarantees, the closure may need to be commutative
1209    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1210    ///
1211    /// # Example
1212    /// ```rust
1213    /// # #[cfg(feature = "deploy")] {
1214    /// # use hydro_lang::prelude::*;
1215    /// # use futures::StreamExt;
1216    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1217    /// let bools = process.source_iter(q!(vec![false, true, false]));
1218    /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1219    /// # }, |mut stream| async move {
1220    /// // true
1221    /// # assert_eq!(stream.next().await.unwrap(), true);
1222    /// # }));
1223    /// # }
1224    /// ```
1225    pub fn reduce<F, C, Idemp>(
1226        self,
1227        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1228    ) -> Optional<T, L, B>
1229    where
1230        F: Fn(&mut T, T) + 'a,
1231        C: ValidCommutativityFor<O>,
1232        Idemp: ValidIdempotenceFor<R>,
1233    {
1234        let f = comb
1235            .splice_fn2_borrow_mut_ctx_props(&self.location)
1236            .0
1237            .into();
1238
1239        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1240        let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1241
1242        let core = HydroNode::Reduce {
1243            f,
1244            input: Box::new(ordered_etc.ir_node.into_inner()),
1245            metadata: ordered_etc
1246                .location
1247                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1248        };
1249
1250        Optional::new(ordered_etc.location, core)
1251    }
1252
1253    /// Computes the maximum element in the stream as an [`Optional`], which
1254    /// will be empty until the first element in the input arrives.
1255    ///
1256    /// # Example
1257    /// ```rust
1258    /// # #[cfg(feature = "deploy")] {
1259    /// # use hydro_lang::prelude::*;
1260    /// # use futures::StreamExt;
1261    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1262    /// let tick = process.tick();
1263    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1264    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1265    /// batch.max().all_ticks()
1266    /// # }, |mut stream| async move {
1267    /// // 4
1268    /// # assert_eq!(stream.next().await.unwrap(), 4);
1269    /// # }));
1270    /// # }
1271    /// ```
1272    pub fn max(self) -> Optional<T, L, B>
1273    where
1274        T: Ord,
1275    {
1276        self.assume_retries_trusted(nondet!(/** max is idempotent */))
1277            .assume_ordering_trusted_bounded(
1278                nondet!(/** max is commutative, but order affects intermediates */),
1279            )
1280            .reduce(q!(|curr, new| {
1281                if new > *curr {
1282                    *curr = new;
1283                }
1284            }))
1285    }
1286
1287    /// Computes the minimum element in the stream as an [`Optional`], which
1288    /// will be empty until the first element in the input arrives.
1289    ///
1290    /// # Example
1291    /// ```rust
1292    /// # #[cfg(feature = "deploy")] {
1293    /// # use hydro_lang::prelude::*;
1294    /// # use futures::StreamExt;
1295    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1296    /// let tick = process.tick();
1297    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1298    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1299    /// batch.min().all_ticks()
1300    /// # }, |mut stream| async move {
1301    /// // 1
1302    /// # assert_eq!(stream.next().await.unwrap(), 1);
1303    /// # }));
1304    /// # }
1305    /// ```
1306    pub fn min(self) -> Optional<T, L, B>
1307    where
1308        T: Ord,
1309    {
1310        self.assume_retries_trusted(nondet!(/** min is idempotent */))
1311            .assume_ordering_trusted_bounded(
1312                nondet!(/** max is commutative, but order affects intermediates */),
1313            )
1314            .reduce(q!(|curr, new| {
1315                if new < *curr {
1316                    *curr = new;
1317                }
1318            }))
1319    }
1320}
1321
1322impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1323where
1324    L: Location<'a>,
1325{
1326    /// Computes the number of elements in the stream as a [`Singleton`].
1327    ///
1328    /// # Example
1329    /// ```rust
1330    /// # #[cfg(feature = "deploy")] {
1331    /// # use hydro_lang::prelude::*;
1332    /// # use futures::StreamExt;
1333    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1334    /// let tick = process.tick();
1335    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1336    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1337    /// batch.count().all_ticks()
1338    /// # }, |mut stream| async move {
1339    /// // 4
1340    /// # assert_eq!(stream.next().await.unwrap(), 4);
1341    /// # }));
1342    /// # }
1343    /// ```
1344    pub fn count(self) -> Singleton<usize, L, B> {
1345        self.assume_ordering_trusted(nondet!(
1346            /// Order does not affect eventual count, and also does not affect intermediate states.
1347        ))
1348        .fold(q!(|| 0usize), q!(|count, _| *count += 1))
1349    }
1350}
1351
1352impl<'a, T, L, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R>
1353where
1354    L: Location<'a>,
1355{
1356    /// Computes the first element in the stream as an [`Optional`], which
1357    /// will be empty until the first element in the input arrives.
1358    ///
1359    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1360    /// re-ordering of elements may cause the first element to change.
1361    ///
1362    /// # Example
1363    /// ```rust
1364    /// # #[cfg(feature = "deploy")] {
1365    /// # use hydro_lang::prelude::*;
1366    /// # use futures::StreamExt;
1367    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1368    /// let tick = process.tick();
1369    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1370    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1371    /// batch.first().all_ticks()
1372    /// # }, |mut stream| async move {
1373    /// // 1
1374    /// # assert_eq!(stream.next().await.unwrap(), 1);
1375    /// # }));
1376    /// # }
1377    /// ```
1378    pub fn first(self) -> Optional<T, L, B> {
1379        self.assume_retries_trusted(nondet!(/** first is idempotent */))
1380            .reduce(q!(|_, _| {}))
1381    }
1382
1383    /// Computes the last element in the stream as an [`Optional`], which
1384    /// will be empty until an element in the input arrives.
1385    ///
1386    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1387    /// re-ordering of elements may cause the last element to change.
1388    ///
1389    /// # Example
1390    /// ```rust
1391    /// # #[cfg(feature = "deploy")] {
1392    /// # use hydro_lang::prelude::*;
1393    /// # use futures::StreamExt;
1394    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1395    /// let tick = process.tick();
1396    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1397    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1398    /// batch.last().all_ticks()
1399    /// # }, |mut stream| async move {
1400    /// // 4
1401    /// # assert_eq!(stream.next().await.unwrap(), 4);
1402    /// # }));
1403    /// # }
1404    /// ```
1405    pub fn last(self) -> Optional<T, L, B> {
1406        self.assume_retries_trusted(nondet!(/** last is idempotent */))
1407            .reduce(q!(|curr, new| *curr = new))
1408    }
1409}
1410
1411impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
1412where
1413    L: Location<'a>,
1414{
1415    /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1416    ///
1417    /// # Example
1418    /// ```rust
1419    /// # #[cfg(feature = "deploy")] {
1420    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1421    /// # use futures::StreamExt;
1422    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1423    /// let tick = process.tick();
1424    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1425    /// numbers.enumerate()
1426    /// # }, |mut stream| async move {
1427    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1428    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1429    /// #     assert_eq!(stream.next().await.unwrap(), w);
1430    /// # }
1431    /// # }));
1432    /// # }
1433    /// ```
1434    pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder, ExactlyOnce> {
1435        Stream::new(
1436            self.location.clone(),
1437            HydroNode::Enumerate {
1438                input: Box::new(self.ir_node.into_inner()),
1439                metadata: self.location.new_node_metadata(Stream::<
1440                    (usize, T),
1441                    L,
1442                    B,
1443                    TotalOrder,
1444                    ExactlyOnce,
1445                >::collection_kind()),
1446            },
1447        )
1448    }
1449
1450    /// Collects all the elements of this stream into a single [`Vec`] element.
1451    ///
1452    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1453    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1454    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1455    /// the vector at an arbitrary point in time.
1456    ///
1457    /// # Example
1458    /// ```rust
1459    /// # #[cfg(feature = "deploy")] {
1460    /// # use hydro_lang::prelude::*;
1461    /// # use futures::StreamExt;
1462    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1463    /// let tick = process.tick();
1464    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1465    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1466    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1467    /// # }, |mut stream| async move {
1468    /// // [ vec![1, 2, 3, 4] ]
1469    /// # for w in vec![vec![1, 2, 3, 4]] {
1470    /// #     assert_eq!(stream.next().await.unwrap(), w);
1471    /// # }
1472    /// # }));
1473    /// # }
1474    /// ```
1475    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B> {
1476        self.fold(
1477            q!(|| vec![]),
1478            q!(|acc, v| {
1479                acc.push(v);
1480            }),
1481        )
1482    }
1483
1484    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1485    /// and emitting each intermediate result.
1486    ///
1487    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1488    /// containing all intermediate accumulated values. The scan operation can also terminate early
1489    /// by returning `None`.
1490    ///
1491    /// The function takes a mutable reference to the accumulator and the current element, and returns
1492    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1493    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1494    ///
1495    /// # Examples
1496    ///
1497    /// Basic usage - running sum:
1498    /// ```rust
1499    /// # #[cfg(feature = "deploy")] {
1500    /// # use hydro_lang::prelude::*;
1501    /// # use futures::StreamExt;
1502    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1503    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1504    ///     q!(|| 0),
1505    ///     q!(|acc, x| {
1506    ///         *acc += x;
1507    ///         Some(*acc)
1508    ///     }),
1509    /// )
1510    /// # }, |mut stream| async move {
1511    /// // Output: 1, 3, 6, 10
1512    /// # for w in vec![1, 3, 6, 10] {
1513    /// #     assert_eq!(stream.next().await.unwrap(), w);
1514    /// # }
1515    /// # }));
1516    /// # }
1517    /// ```
1518    ///
1519    /// Early termination example:
1520    /// ```rust
1521    /// # #[cfg(feature = "deploy")] {
1522    /// # use hydro_lang::prelude::*;
1523    /// # use futures::StreamExt;
1524    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1525    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1526    ///     q!(|| 1),
1527    ///     q!(|state, x| {
1528    ///         *state = *state * x;
1529    ///         if *state > 6 {
1530    ///             None // Terminate the stream
1531    ///         } else {
1532    ///             Some(-*state)
1533    ///         }
1534    ///     }),
1535    /// )
1536    /// # }, |mut stream| async move {
1537    /// // Output: -1, -2, -6
1538    /// # for w in vec![-1, -2, -6] {
1539    /// #     assert_eq!(stream.next().await.unwrap(), w);
1540    /// # }
1541    /// # }));
1542    /// # }
1543    /// ```
1544    pub fn scan<A, U, I, F>(
1545        self,
1546        init: impl IntoQuotedMut<'a, I, L>,
1547        f: impl IntoQuotedMut<'a, F, L>,
1548    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1549    where
1550        I: Fn() -> A + 'a,
1551        F: Fn(&mut A, T) -> Option<U> + 'a,
1552    {
1553        let init = init.splice_fn0_ctx(&self.location).into();
1554        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1555
1556        Stream::new(
1557            self.location.clone(),
1558            HydroNode::Scan {
1559                init,
1560                acc: f,
1561                input: Box::new(self.ir_node.into_inner()),
1562                metadata: self.location.new_node_metadata(
1563                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1564                ),
1565            },
1566        )
1567    }
1568}
1569
1570impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
1571    /// Produces a new stream that interleaves the elements of the two input streams.
1572    /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1573    ///
1574    /// Currently, both input streams must be [`Unbounded`]. When the streams are
1575    /// [`Bounded`], you can use [`Stream::chain`] instead.
1576    ///
1577    /// # Example
1578    /// ```rust
1579    /// # #[cfg(feature = "deploy")] {
1580    /// # use hydro_lang::prelude::*;
1581    /// # use futures::StreamExt;
1582    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1583    /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
1584    /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
1585    /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1586    /// # }, |mut stream| async move {
1587    /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1588    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1589    /// #     assert_eq!(stream.next().await.unwrap(), w);
1590    /// # }
1591    /// # }));
1592    /// # }
1593    /// ```
1594    pub fn interleave<O2: Ordering, R2: Retries>(
1595        self,
1596        other: Stream<T, L, Unbounded, O2, R2>,
1597    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1598    where
1599        R: MinRetries<R2>,
1600    {
1601        Stream::new(
1602            self.location.clone(),
1603            HydroNode::Chain {
1604                first: Box::new(self.ir_node.into_inner()),
1605                second: Box::new(other.ir_node.into_inner()),
1606                metadata: self.location.new_node_metadata(Stream::<
1607                    T,
1608                    L,
1609                    Unbounded,
1610                    NoOrder,
1611                    <R as MinRetries<R2>>::Min,
1612                >::collection_kind()),
1613            },
1614        )
1615    }
1616}
1617
1618impl<'a, T, L: Location<'a> + NoTick, R: Retries> Stream<T, L, Unbounded, TotalOrder, R> {
1619    /// Produces a new stream that combines the elements of the two input streams,
1620    /// preserving the relative order of elements within each input.
1621    ///
1622    /// Currently, both input streams must be [`Unbounded`]. When the streams are
1623    /// [`Bounded`], you can use [`Stream::chain`] instead.
1624    ///
1625    /// # Non-Determinism
1626    /// The order in which elements *across* the two streams will be interleaved is
1627    /// non-deterministic, so the order of elements will vary across runs. If the output order
1628    /// is irrelevant, use [`Stream::interleave`] instead, which is deterministic but emits an
1629    /// unordered stream.
1630    ///
1631    /// # Example
1632    /// ```rust
1633    /// # #[cfg(feature = "deploy")] {
1634    /// # use hydro_lang::prelude::*;
1635    /// # use futures::StreamExt;
1636    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1637    /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
1638    /// # process.source_iter(q!(vec![1, 3])).into();
1639    /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
1640    /// # }, |mut stream| async move {
1641    /// // 1, 3 and 2, 4 in some order, preserving the original local order
1642    /// # for w in vec![1, 3, 2, 4] {
1643    /// #     assert_eq!(stream.next().await.unwrap(), w);
1644    /// # }
1645    /// # }));
1646    /// # }
1647    /// ```
1648    pub fn merge_ordered<R2: Retries>(
1649        self,
1650        other: Stream<T, L, Unbounded, TotalOrder, R2>,
1651        _nondet: NonDet,
1652    ) -> Stream<T, L, Unbounded, TotalOrder, <R as MinRetries<R2>>::Min>
1653    where
1654        R: MinRetries<R2>,
1655    {
1656        Stream::new(
1657            self.location.clone(),
1658            HydroNode::Chain {
1659                first: Box::new(self.ir_node.into_inner()),
1660                second: Box::new(other.ir_node.into_inner()),
1661                metadata: self.location.new_node_metadata(Stream::<
1662                    T,
1663                    L,
1664                    Unbounded,
1665                    TotalOrder,
1666                    <R as MinRetries<R2>>::Min,
1667                >::collection_kind()),
1668            },
1669        )
1670    }
1671}
1672
1673impl<'a, T, L, O: Ordering, R: Retries> Stream<T, L, Bounded, O, R>
1674where
1675    L: Location<'a>,
1676{
1677    /// Produces a new stream that emits the input elements in sorted order.
1678    ///
1679    /// The input stream can have any ordering guarantee, but the output stream
1680    /// will have a [`TotalOrder`] guarantee. This operator will block until all
1681    /// elements in the input stream are available, so it requires the input stream
1682    /// to be [`Bounded`].
1683    ///
1684    /// # Example
1685    /// ```rust
1686    /// # #[cfg(feature = "deploy")] {
1687    /// # use hydro_lang::prelude::*;
1688    /// # use futures::StreamExt;
1689    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1690    /// let tick = process.tick();
1691    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1692    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1693    /// batch.sort().all_ticks()
1694    /// # }, |mut stream| async move {
1695    /// // 1, 2, 3, 4
1696    /// # for w in (1..5) {
1697    /// #     assert_eq!(stream.next().await.unwrap(), w);
1698    /// # }
1699    /// # }));
1700    /// # }
1701    /// ```
1702    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1703    where
1704        T: Ord,
1705    {
1706        Stream::new(
1707            self.location.clone(),
1708            HydroNode::Sort {
1709                input: Box::new(self.ir_node.into_inner()),
1710                metadata: self
1711                    .location
1712                    .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
1713            },
1714        )
1715    }
1716
1717    /// Produces a new stream that first emits the elements of the `self` stream,
1718    /// and then emits the elements of the `other` stream. The output stream has
1719    /// a [`TotalOrder`] guarantee if and only if both input streams have a
1720    /// [`TotalOrder`] guarantee.
1721    ///
1722    /// Currently, both input streams must be [`Bounded`]. This operator will block
1723    /// on the first stream until all its elements are available. In a future version,
1724    /// we will relax the requirement on the `other` stream.
1725    ///
1726    /// # Example
1727    /// ```rust
1728    /// # #[cfg(feature = "deploy")] {
1729    /// # use hydro_lang::prelude::*;
1730    /// # use futures::StreamExt;
1731    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1732    /// let tick = process.tick();
1733    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1734    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1735    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1736    /// # }, |mut stream| async move {
1737    /// // 2, 3, 4, 5, 1, 2, 3, 4
1738    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1739    /// #     assert_eq!(stream.next().await.unwrap(), w);
1740    /// # }
1741    /// # }));
1742    /// # }
1743    /// ```
1744    pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
1745        self,
1746        other: Stream<T, L, B2, O2, R2>,
1747    ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1748    where
1749        O: MinOrder<O2>,
1750        R: MinRetries<R2>,
1751    {
1752        check_matching_location(&self.location, &other.location);
1753
1754        Stream::new(
1755            self.location.clone(),
1756            HydroNode::Chain {
1757                first: Box::new(self.ir_node.into_inner()),
1758                second: Box::new(other.ir_node.into_inner()),
1759                metadata: self.location.new_node_metadata(Stream::<
1760                    T,
1761                    L,
1762                    B2,
1763                    <O as MinOrder<O2>>::Min,
1764                    <R as MinRetries<R2>>::Min,
1765                >::collection_kind()),
1766            },
1767        )
1768    }
1769
1770    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
1771    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
1772    /// because this is compiled into a nested loop.
1773    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
1774        self,
1775        other: Stream<T2, L, Bounded, O2, R>,
1776    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
1777    where
1778        T: Clone,
1779        T2: Clone,
1780    {
1781        check_matching_location(&self.location, &other.location);
1782
1783        Stream::new(
1784            self.location.clone(),
1785            HydroNode::CrossProduct {
1786                left: Box::new(self.ir_node.into_inner()),
1787                right: Box::new(other.ir_node.into_inner()),
1788                metadata: self.location.new_node_metadata(Stream::<
1789                    (T, T2),
1790                    L,
1791                    Bounded,
1792                    <O2 as MinOrder<O>>::Min,
1793                    R,
1794                >::collection_kind()),
1795            },
1796        )
1797    }
1798
1799    /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
1800    /// `self` used as the values for *each* key.
1801    ///
1802    /// This is helpful when "broadcasting" a set of values so that all the keys have the same
1803    /// values. For example, it can be used to send the same set of elements to several cluster
1804    /// members, if the membership information is available as a [`KeyedSingleton`].
1805    ///
1806    /// # Example
1807    /// ```rust
1808    /// # #[cfg(feature = "deploy")] {
1809    /// # use hydro_lang::prelude::*;
1810    /// # use futures::StreamExt;
1811    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1812    /// # let tick = process.tick();
1813    /// let keyed_singleton = // { 1: (), 2: () }
1814    /// # process
1815    /// #     .source_iter(q!(vec![(1, ()), (2, ())]))
1816    /// #     .into_keyed()
1817    /// #     .batch(&tick, nondet!(/** test */))
1818    /// #     .first();
1819    /// let stream = // [ "a", "b" ]
1820    /// # process
1821    /// #     .source_iter(q!(vec!["a".to_string(), "b".to_string()]))
1822    /// #     .batch(&tick, nondet!(/** test */));
1823    /// stream.repeat_with_keys(keyed_singleton)
1824    /// # .entries().all_ticks()
1825    /// # }, |mut stream| async move {
1826    /// // { 1: ["a", "b" ], 2: ["a", "b"] }
1827    /// # let mut results = Vec::new();
1828    /// # for _ in 0..4 {
1829    /// #     results.push(stream.next().await.unwrap());
1830    /// # }
1831    /// # results.sort();
1832    /// # assert_eq!(results, vec![(1, "a".to_string()), (1, "b".to_string()), (2, "a".to_string()), (2, "b".to_string())]);
1833    /// # }));
1834    /// # }
1835    /// ```
1836    pub fn repeat_with_keys<K, V2>(
1837        self,
1838        keys: KeyedSingleton<K, V2, L, Bounded>,
1839    ) -> KeyedStream<K, T, L, Bounded, O, R>
1840    where
1841        K: Clone,
1842        T: Clone,
1843    {
1844        keys.keys()
1845            .weaken_retries()
1846            .assume_ordering_trusted::<TotalOrder>(
1847                nondet!(/** keyed stream does not depend on ordering of keys */),
1848            )
1849            .cross_product_nested_loop(self)
1850            .into_keyed()
1851    }
1852}
1853
1854impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
1855where
1856    L: Location<'a>,
1857{
1858    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
1859    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
1860    /// by equi-joining the two streams on the key attribute `K`.
1861    ///
1862    /// # Example
1863    /// ```rust
1864    /// # #[cfg(feature = "deploy")] {
1865    /// # use hydro_lang::prelude::*;
1866    /// # use std::collections::HashSet;
1867    /// # use futures::StreamExt;
1868    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1869    /// let tick = process.tick();
1870    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
1871    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
1872    /// stream1.join(stream2)
1873    /// # }, |mut stream| async move {
1874    /// // (1, ('a', 'x')), (2, ('b', 'y'))
1875    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
1876    /// # stream.map(|i| assert!(expected.contains(&i)));
1877    /// # }));
1878    /// # }
1879    pub fn join<V2, O2: Ordering, R2: Retries>(
1880        self,
1881        n: Stream<(K, V2), L, B, O2, R2>,
1882    ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
1883    where
1884        K: Eq + Hash,
1885        R: MinRetries<R2>,
1886    {
1887        check_matching_location(&self.location, &n.location);
1888
1889        Stream::new(
1890            self.location.clone(),
1891            HydroNode::Join {
1892                left: Box::new(self.ir_node.into_inner()),
1893                right: Box::new(n.ir_node.into_inner()),
1894                metadata: self.location.new_node_metadata(Stream::<
1895                    (K, (V1, V2)),
1896                    L,
1897                    B,
1898                    NoOrder,
1899                    <R as MinRetries<R2>>::Min,
1900                >::collection_kind()),
1901            },
1902        )
1903    }
1904
1905    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
1906    /// computes the anti-join of the items in the input -- i.e. returns
1907    /// unique items in the first input that do not have a matching key
1908    /// in the second input.
1909    ///
1910    /// # Example
1911    /// ```rust
1912    /// # #[cfg(feature = "deploy")] {
1913    /// # use hydro_lang::prelude::*;
1914    /// # use futures::StreamExt;
1915    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1916    /// let tick = process.tick();
1917    /// let stream = process
1918    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1919    ///   .batch(&tick, nondet!(/** test */));
1920    /// let batch = process
1921    ///   .source_iter(q!(vec![1, 2]))
1922    ///   .batch(&tick, nondet!(/** test */));
1923    /// stream.anti_join(batch).all_ticks()
1924    /// # }, |mut stream| async move {
1925    /// # for w in vec![(3, 'c'), (4, 'd')] {
1926    /// #     assert_eq!(stream.next().await.unwrap(), w);
1927    /// # }
1928    /// # }));
1929    /// # }
1930    pub fn anti_join<O2: Ordering, R2: Retries>(
1931        self,
1932        n: Stream<K, L, Bounded, O2, R2>,
1933    ) -> Stream<(K, V1), L, B, O, R>
1934    where
1935        K: Eq + Hash,
1936    {
1937        check_matching_location(&self.location, &n.location);
1938
1939        Stream::new(
1940            self.location.clone(),
1941            HydroNode::AntiJoin {
1942                pos: Box::new(self.ir_node.into_inner()),
1943                neg: Box::new(n.ir_node.into_inner()),
1944                metadata: self
1945                    .location
1946                    .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
1947            },
1948        )
1949    }
1950}
1951
1952impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
1953    Stream<(K, V), L, B, O, R>
1954{
1955    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
1956    /// is used as the key and the second element is added to the entries associated with that key.
1957    ///
1958    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
1959    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
1960    /// performing grouped aggregations, but also for more precise ordering guarantees such as
1961    /// total ordering _within_ each group but no ordering _across_ groups.
1962    ///
1963    /// # Example
1964    /// ```rust
1965    /// # #[cfg(feature = "deploy")] {
1966    /// # use hydro_lang::prelude::*;
1967    /// # use futures::StreamExt;
1968    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1969    /// process
1970    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1971    ///     .into_keyed()
1972    /// #   .entries()
1973    /// # }, |mut stream| async move {
1974    /// // { 1: [2, 3], 2: [4] }
1975    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
1976    /// #     assert_eq!(stream.next().await.unwrap(), w);
1977    /// # }
1978    /// # }));
1979    /// # }
1980    /// ```
1981    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
1982        KeyedStream::new(
1983            self.location.clone(),
1984            HydroNode::Cast {
1985                inner: Box::new(self.ir_node.into_inner()),
1986                metadata: self
1987                    .location
1988                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
1989            },
1990        )
1991    }
1992}
1993
1994impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
1995where
1996    K: Eq + Hash,
1997    L: Location<'a>,
1998{
1999    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2000    /// # Example
2001    /// ```rust
2002    /// # #[cfg(feature = "deploy")] {
2003    /// # use hydro_lang::prelude::*;
2004    /// # use futures::StreamExt;
2005    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2006    /// let tick = process.tick();
2007    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2008    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2009    /// batch.keys().all_ticks()
2010    /// # }, |mut stream| async move {
2011    /// // 1, 2
2012    /// # assert_eq!(stream.next().await.unwrap(), 1);
2013    /// # assert_eq!(stream.next().await.unwrap(), 2);
2014    /// # }));
2015    /// # }
2016    /// ```
2017    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2018        self.into_keyed()
2019            .fold(
2020                q!(|| ()),
2021                q!(
2022                    |_, _| {},
2023                    commutative = ManualProof(/* values are ignored */),
2024                    idempotent = ManualProof(/* values are ignored */)
2025                ),
2026            )
2027            .keys()
2028    }
2029}
2030
2031impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2032where
2033    L: Location<'a> + NoTick,
2034{
2035    /// Returns a stream corresponding to the latest batch of elements being atomically
2036    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2037    /// the order of the input.
2038    ///
2039    /// # Non-Determinism
2040    /// The batch boundaries are non-deterministic and may change across executions.
2041    pub fn batch_atomic(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2042        Stream::new(
2043            self.location.clone().tick,
2044            HydroNode::Batch {
2045                inner: Box::new(self.ir_node.into_inner()),
2046                metadata: self
2047                    .location
2048                    .tick
2049                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2050            },
2051        )
2052    }
2053
2054    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2055    /// See [`Stream::atomic`] for more details.
2056    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2057        Stream::new(
2058            self.location.tick.l.clone(),
2059            HydroNode::EndAtomic {
2060                inner: Box::new(self.ir_node.into_inner()),
2061                metadata: self
2062                    .location
2063                    .tick
2064                    .l
2065                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2066            },
2067        )
2068    }
2069}
2070
2071impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2072where
2073    L: Location<'a>,
2074{
2075    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
2076    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
2077    ///
2078    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2079    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2080    /// argument that declares where the stream will be atomically processed. Batching a stream into
2081    /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2082    /// [`Tick`] will introduce asynchrony.
2083    pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
2084        let out_location = Atomic { tick: tick.clone() };
2085        Stream::new(
2086            out_location.clone(),
2087            HydroNode::BeginAtomic {
2088                inner: Box::new(self.ir_node.into_inner()),
2089                metadata: out_location
2090                    .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
2091            },
2092        )
2093    }
2094
2095    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2096    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2097    /// the order of the input. The output stream will execute in the [`Tick`] that was
2098    /// used to create the atomic section.
2099    ///
2100    /// # Non-Determinism
2101    /// The batch boundaries are non-deterministic and may change across executions.
2102    pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2103        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2104        Stream::new(
2105            tick.clone(),
2106            HydroNode::Batch {
2107                inner: Box::new(self.ir_node.into_inner()),
2108                metadata: tick
2109                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2110            },
2111        )
2112    }
2113
2114    /// Given a time interval, returns a stream corresponding to samples taken from the
2115    /// stream roughly at that interval. The output will have elements in the same order
2116    /// as the input, but with arbitrary elements skipped between samples. There is also
2117    /// no guarantee on the exact timing of the samples.
2118    ///
2119    /// # Non-Determinism
2120    /// The output stream is non-deterministic in which elements are sampled, since this
2121    /// is controlled by a clock.
2122    pub fn sample_every(
2123        self,
2124        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
2125        nondet: NonDet,
2126    ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
2127    where
2128        L: NoTick + NoAtomic,
2129    {
2130        let samples = self.location.source_interval(interval, nondet);
2131
2132        let tick = self.location.tick();
2133        self.batch(&tick, nondet)
2134            .filter_if_some(samples.batch(&tick, nondet).first())
2135            .all_ticks()
2136            .weakest_retries()
2137    }
2138
2139    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
2140    /// stream has not emitted a value since that duration.
2141    ///
2142    /// # Non-Determinism
2143    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
2144    /// samples take place, timeouts may be non-deterministically generated or missed,
2145    /// and the notification of the timeout may be delayed as well. There is also no
2146    /// guarantee on how long the [`Optional`] will have a value after the timeout is
2147    /// detected based on when the next sample is taken.
2148    pub fn timeout(
2149        self,
2150        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
2151        nondet: NonDet,
2152    ) -> Optional<(), L, Unbounded>
2153    where
2154        L: NoTick + NoAtomic,
2155    {
2156        let tick = self.location.tick();
2157
2158        let latest_received = self.assume_retries(nondet).fold(
2159            q!(|| None),
2160            q!(
2161                |latest, _| {
2162                    *latest = Some(Instant::now());
2163                },
2164                commutative = ManualProof(/* TODO */)
2165            ),
2166        );
2167
2168        latest_received
2169            .snapshot(&tick, nondet)
2170            .filter_map(q!(move |latest_received| {
2171                if let Some(latest_received) = latest_received {
2172                    if Instant::now().duration_since(latest_received) > duration {
2173                        Some(())
2174                    } else {
2175                        None
2176                    }
2177                } else {
2178                    Some(())
2179                }
2180            }))
2181            .latest()
2182    }
2183}
2184
2185impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2186where
2187    L: Location<'a> + NoTick + NoAtomic,
2188    F: Future<Output = T>,
2189{
2190    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2191    /// Future outputs are produced as available, regardless of input arrival order.
2192    ///
2193    /// # Example
2194    /// ```rust
2195    /// # #[cfg(feature = "deploy")] {
2196    /// # use std::collections::HashSet;
2197    /// # use futures::StreamExt;
2198    /// # use hydro_lang::prelude::*;
2199    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2200    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2201    ///     .map(q!(|x| async move {
2202    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2203    ///         x
2204    ///     }))
2205    ///     .resolve_futures()
2206    /// #   },
2207    /// #   |mut stream| async move {
2208    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2209    /// #       let mut output = HashSet::new();
2210    /// #       for _ in 1..10 {
2211    /// #           output.insert(stream.next().await.unwrap());
2212    /// #       }
2213    /// #       assert_eq!(
2214    /// #           output,
2215    /// #           HashSet::<i32>::from_iter(1..10)
2216    /// #       );
2217    /// #   },
2218    /// # ));
2219    /// # }
2220    pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
2221        Stream::new(
2222            self.location.clone(),
2223            HydroNode::ResolveFutures {
2224                input: Box::new(self.ir_node.into_inner()),
2225                metadata: self
2226                    .location
2227                    .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
2228            },
2229        )
2230    }
2231
2232    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2233    /// Future outputs are produced in the same order as the input stream.
2234    ///
2235    /// # Example
2236    /// ```rust
2237    /// # #[cfg(feature = "deploy")] {
2238    /// # use std::collections::HashSet;
2239    /// # use futures::StreamExt;
2240    /// # use hydro_lang::prelude::*;
2241    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2242    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2243    ///     .map(q!(|x| async move {
2244    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2245    ///         x
2246    ///     }))
2247    ///     .resolve_futures_ordered()
2248    /// #   },
2249    /// #   |mut stream| async move {
2250    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2251    /// #       let mut output = Vec::new();
2252    /// #       for _ in 1..10 {
2253    /// #           output.push(stream.next().await.unwrap());
2254    /// #       }
2255    /// #       assert_eq!(
2256    /// #           output,
2257    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2258    /// #       );
2259    /// #   },
2260    /// # ));
2261    /// # }
2262    pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
2263        Stream::new(
2264            self.location.clone(),
2265            HydroNode::ResolveFuturesOrdered {
2266                input: Box::new(self.ir_node.into_inner()),
2267                metadata: self
2268                    .location
2269                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2270            },
2271        )
2272    }
2273}
2274
2275impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
2276where
2277    L: Location<'a> + NoTick,
2278{
2279    /// Executes the provided closure for every element in this stream.
2280    ///
2281    /// Because the closure may have side effects, the stream must have deterministic order
2282    /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
2283    /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
2284    /// [`Stream::assume_retries`] with an explanation for why this is the case.
2285    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
2286        let f = f.splice_fn1_ctx(&self.location).into();
2287        self.location
2288            .flow_state()
2289            .borrow_mut()
2290            .push_root(HydroRoot::ForEach {
2291                input: Box::new(self.ir_node.into_inner()),
2292                f,
2293                op_metadata: HydroIrOpMetadata::new(),
2294            });
2295    }
2296
2297    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
2298    /// TCP socket to some other server. You should _not_ use this API for interacting with
2299    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
2300    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
2301    /// interaction with asynchronous sinks.
2302    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
2303    where
2304        S: 'a + futures::Sink<T> + Unpin,
2305    {
2306        self.location
2307            .flow_state()
2308            .borrow_mut()
2309            .push_root(HydroRoot::DestSink {
2310                sink: sink.splice_typed_ctx(&self.location).into(),
2311                input: Box::new(self.ir_node.into_inner()),
2312                op_metadata: HydroIrOpMetadata::new(),
2313            });
2314    }
2315}
2316
2317impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2318where
2319    L: Location<'a>,
2320{
2321    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2322    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2323    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2324        Stream::new(
2325            self.location.outer().clone(),
2326            HydroNode::YieldConcat {
2327                inner: Box::new(self.ir_node.into_inner()),
2328                metadata: self
2329                    .location
2330                    .outer()
2331                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2332            },
2333        )
2334    }
2335
2336    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2337    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2338    ///
2339    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2340    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2341    /// stream's [`Tick`] context.
2342    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2343        let out_location = Atomic {
2344            tick: self.location.clone(),
2345        };
2346
2347        Stream::new(
2348            out_location.clone(),
2349            HydroNode::YieldConcat {
2350                inner: Box::new(self.ir_node.into_inner()),
2351                metadata: out_location
2352                    .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2353            },
2354        )
2355    }
2356
2357    /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
2358    /// such as `fold` retrain their memory across ticks rather than resetting across batches of
2359    /// input.
2360    ///
2361    /// This API is particularly useful for stateful computation on batches of data, such as
2362    /// maintaining an accumulated state that is up to date with the current batch.
2363    ///
2364    /// # Example
2365    /// ```rust
2366    /// # #[cfg(feature = "deploy")] {
2367    /// # use hydro_lang::prelude::*;
2368    /// # use futures::StreamExt;
2369    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2370    /// let tick = process.tick();
2371    /// # // ticks are lazy by default, forces the second tick to run
2372    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2373    /// # let batch_first_tick = process
2374    /// #   .source_iter(q!(vec![1, 2, 3, 4]))
2375    /// #  .batch(&tick, nondet!(/** test */));
2376    /// # let batch_second_tick = process
2377    /// #   .source_iter(q!(vec![5, 6, 7]))
2378    /// #   .batch(&tick, nondet!(/** test */))
2379    /// #   .defer_tick(); // appears on the second tick
2380    /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
2381    /// # batch_first_tick.chain(batch_second_tick).all_ticks();
2382    ///
2383    /// input.batch(&tick, nondet!(/** test */))
2384    ///     .across_ticks(|s| s.count()).all_ticks()
2385    /// # }, |mut stream| async move {
2386    /// // [4, 7]
2387    /// assert_eq!(stream.next().await.unwrap(), 4);
2388    /// assert_eq!(stream.next().await.unwrap(), 7);
2389    /// # }));
2390    /// # }
2391    /// ```
2392    pub fn across_ticks<Out: BatchAtomic>(
2393        self,
2394        thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
2395    ) -> Out::Batched {
2396        thunk(self.all_ticks_atomic()).batched_atomic()
2397    }
2398
2399    /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2400    /// always has the elements of `self` at tick `T - 1`.
2401    ///
2402    /// At tick `0`, the output stream is empty, since there is no previous tick.
2403    ///
2404    /// This operator enables stateful iterative processing with ticks, by sending data from one
2405    /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2406    ///
2407    /// # Example
2408    /// ```rust
2409    /// # #[cfg(feature = "deploy")] {
2410    /// # use hydro_lang::prelude::*;
2411    /// # use futures::StreamExt;
2412    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2413    /// let tick = process.tick();
2414    /// // ticks are lazy by default, forces the second tick to run
2415    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2416    ///
2417    /// let batch_first_tick = process
2418    ///   .source_iter(q!(vec![1, 2, 3, 4]))
2419    ///   .batch(&tick, nondet!(/** test */));
2420    /// let batch_second_tick = process
2421    ///   .source_iter(q!(vec![0, 3, 4, 5, 6]))
2422    ///   .batch(&tick, nondet!(/** test */))
2423    ///   .defer_tick(); // appears on the second tick
2424    /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2425    ///
2426    /// changes_across_ticks.clone().filter_not_in(
2427    ///     changes_across_ticks.defer_tick() // the elements from the previous tick
2428    /// ).all_ticks()
2429    /// # }, |mut stream| async move {
2430    /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2431    /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2432    /// #     assert_eq!(stream.next().await.unwrap(), w);
2433    /// # }
2434    /// # }));
2435    /// # }
2436    /// ```
2437    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2438        Stream::new(
2439            self.location.clone(),
2440            HydroNode::DeferTick {
2441                input: Box::new(self.ir_node.into_inner()),
2442                metadata: self
2443                    .location
2444                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2445            },
2446        )
2447    }
2448}
2449
2450#[cfg(test)]
2451mod tests {
2452    #[cfg(feature = "deploy")]
2453    use futures::{SinkExt, StreamExt};
2454    #[cfg(feature = "deploy")]
2455    use hydro_deploy::Deployment;
2456    #[cfg(feature = "deploy")]
2457    use serde::{Deserialize, Serialize};
2458    #[cfg(feature = "deploy")]
2459    use stageleft::q;
2460
2461    #[cfg(any(feature = "deploy", feature = "sim"))]
2462    use crate::compile::builder::FlowBuilder;
2463    #[cfg(feature = "deploy")]
2464    use crate::live_collections::sliced::sliced;
2465    #[cfg(feature = "deploy")]
2466    use crate::live_collections::stream::ExactlyOnce;
2467    #[cfg(feature = "sim")]
2468    use crate::live_collections::stream::NoOrder;
2469    #[cfg(any(feature = "deploy", feature = "sim"))]
2470    use crate::live_collections::stream::TotalOrder;
2471    #[cfg(any(feature = "deploy", feature = "sim"))]
2472    use crate::location::Location;
2473    #[cfg(any(feature = "deploy", feature = "sim"))]
2474    use crate::nondet::nondet;
2475
2476    mod backtrace_chained_ops;
2477
2478    #[cfg(feature = "deploy")]
2479    struct P1 {}
2480    #[cfg(feature = "deploy")]
2481    struct P2 {}
2482
2483    #[cfg(feature = "deploy")]
2484    #[derive(Serialize, Deserialize, Debug)]
2485    struct SendOverNetwork {
2486        n: u32,
2487    }
2488
2489    #[cfg(feature = "deploy")]
2490    #[tokio::test]
2491    async fn first_ten_distributed() {
2492        use crate::networking::TCP;
2493
2494        let mut deployment = Deployment::new();
2495
2496        let flow = FlowBuilder::new();
2497        let first_node = flow.process::<P1>();
2498        let second_node = flow.process::<P2>();
2499        let external = flow.external::<P2>();
2500
2501        let numbers = first_node.source_iter(q!(0..10));
2502        let out_port = numbers
2503            .map(q!(|n| SendOverNetwork { n }))
2504            .send(&second_node, TCP.bincode())
2505            .send_bincode_external(&external);
2506
2507        let nodes = flow
2508            .with_process(&first_node, deployment.Localhost())
2509            .with_process(&second_node, deployment.Localhost())
2510            .with_external(&external, deployment.Localhost())
2511            .deploy(&mut deployment);
2512
2513        deployment.deploy().await.unwrap();
2514
2515        let mut external_out = nodes.connect(out_port).await;
2516
2517        deployment.start().await.unwrap();
2518
2519        for i in 0..10 {
2520            assert_eq!(external_out.next().await.unwrap().n, i);
2521        }
2522    }
2523
2524    #[cfg(feature = "deploy")]
2525    #[tokio::test]
2526    async fn first_cardinality() {
2527        let mut deployment = Deployment::new();
2528
2529        let flow = FlowBuilder::new();
2530        let node = flow.process::<()>();
2531        let external = flow.external::<()>();
2532
2533        let node_tick = node.tick();
2534        let count = node_tick
2535            .singleton(q!([1, 2, 3]))
2536            .into_stream()
2537            .flatten_ordered()
2538            .first()
2539            .into_stream()
2540            .count()
2541            .all_ticks()
2542            .send_bincode_external(&external);
2543
2544        let nodes = flow
2545            .with_process(&node, deployment.Localhost())
2546            .with_external(&external, deployment.Localhost())
2547            .deploy(&mut deployment);
2548
2549        deployment.deploy().await.unwrap();
2550
2551        let mut external_out = nodes.connect(count).await;
2552
2553        deployment.start().await.unwrap();
2554
2555        assert_eq!(external_out.next().await.unwrap(), 1);
2556    }
2557
2558    #[cfg(feature = "deploy")]
2559    #[tokio::test]
2560    async fn unbounded_reduce_remembers_state() {
2561        let mut deployment = Deployment::new();
2562
2563        let flow = FlowBuilder::new();
2564        let node = flow.process::<()>();
2565        let external = flow.external::<()>();
2566
2567        let (input_port, input) = node.source_external_bincode(&external);
2568        let out = input
2569            .reduce(q!(|acc, v| *acc += v))
2570            .sample_eager(nondet!(/** test */))
2571            .send_bincode_external(&external);
2572
2573        let nodes = flow
2574            .with_process(&node, deployment.Localhost())
2575            .with_external(&external, deployment.Localhost())
2576            .deploy(&mut deployment);
2577
2578        deployment.deploy().await.unwrap();
2579
2580        let mut external_in = nodes.connect(input_port).await;
2581        let mut external_out = nodes.connect(out).await;
2582
2583        deployment.start().await.unwrap();
2584
2585        external_in.send(1).await.unwrap();
2586        assert_eq!(external_out.next().await.unwrap(), 1);
2587
2588        external_in.send(2).await.unwrap();
2589        assert_eq!(external_out.next().await.unwrap(), 3);
2590    }
2591
2592    #[cfg(feature = "deploy")]
2593    #[tokio::test]
2594    async fn top_level_bounded_cross_singleton() {
2595        let mut deployment = Deployment::new();
2596
2597        let flow = FlowBuilder::new();
2598        let node = flow.process::<()>();
2599        let external = flow.external::<()>();
2600
2601        let (input_port, input) =
2602            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2603
2604        let out = input
2605            .cross_singleton(
2606                node.source_iter(q!(vec![1, 2, 3]))
2607                    .fold(q!(|| 0), q!(|acc, v| *acc += v)),
2608            )
2609            .send_bincode_external(&external);
2610
2611        let nodes = flow
2612            .with_process(&node, deployment.Localhost())
2613            .with_external(&external, deployment.Localhost())
2614            .deploy(&mut deployment);
2615
2616        deployment.deploy().await.unwrap();
2617
2618        let mut external_in = nodes.connect(input_port).await;
2619        let mut external_out = nodes.connect(out).await;
2620
2621        deployment.start().await.unwrap();
2622
2623        external_in.send(1).await.unwrap();
2624        assert_eq!(external_out.next().await.unwrap(), (1, 6));
2625
2626        external_in.send(2).await.unwrap();
2627        assert_eq!(external_out.next().await.unwrap(), (2, 6));
2628    }
2629
2630    #[cfg(feature = "deploy")]
2631    #[tokio::test]
2632    async fn top_level_bounded_reduce_cardinality() {
2633        let mut deployment = Deployment::new();
2634
2635        let flow = FlowBuilder::new();
2636        let node = flow.process::<()>();
2637        let external = flow.external::<()>();
2638
2639        let (input_port, input) =
2640            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2641
2642        let out = sliced! {
2643            let input = use(input, nondet!(/** test */));
2644            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
2645            input.cross_singleton(v.into_stream().count())
2646        }
2647        .send_bincode_external(&external);
2648
2649        let nodes = flow
2650            .with_process(&node, deployment.Localhost())
2651            .with_external(&external, deployment.Localhost())
2652            .deploy(&mut deployment);
2653
2654        deployment.deploy().await.unwrap();
2655
2656        let mut external_in = nodes.connect(input_port).await;
2657        let mut external_out = nodes.connect(out).await;
2658
2659        deployment.start().await.unwrap();
2660
2661        external_in.send(1).await.unwrap();
2662        assert_eq!(external_out.next().await.unwrap(), (1, 1));
2663
2664        external_in.send(2).await.unwrap();
2665        assert_eq!(external_out.next().await.unwrap(), (2, 1));
2666    }
2667
2668    #[cfg(feature = "deploy")]
2669    #[tokio::test]
2670    async fn top_level_bounded_into_singleton_cardinality() {
2671        let mut deployment = Deployment::new();
2672
2673        let flow = FlowBuilder::new();
2674        let node = flow.process::<()>();
2675        let external = flow.external::<()>();
2676
2677        let (input_port, input) =
2678            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2679
2680        let out = sliced! {
2681            let input = use(input, nondet!(/** test */));
2682            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
2683            input.cross_singleton(v.into_stream().count())
2684        }
2685        .send_bincode_external(&external);
2686
2687        let nodes = flow
2688            .with_process(&node, deployment.Localhost())
2689            .with_external(&external, deployment.Localhost())
2690            .deploy(&mut deployment);
2691
2692        deployment.deploy().await.unwrap();
2693
2694        let mut external_in = nodes.connect(input_port).await;
2695        let mut external_out = nodes.connect(out).await;
2696
2697        deployment.start().await.unwrap();
2698
2699        external_in.send(1).await.unwrap();
2700        assert_eq!(external_out.next().await.unwrap(), (1, 1));
2701
2702        external_in.send(2).await.unwrap();
2703        assert_eq!(external_out.next().await.unwrap(), (2, 1));
2704    }
2705
2706    #[cfg(feature = "deploy")]
2707    #[tokio::test]
2708    async fn atomic_fold_replays_each_tick() {
2709        let mut deployment = Deployment::new();
2710
2711        let flow = FlowBuilder::new();
2712        let node = flow.process::<()>();
2713        let external = flow.external::<()>();
2714
2715        let (input_port, input) =
2716            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2717        let tick = node.tick();
2718
2719        let out = input
2720            .batch(&tick, nondet!(/** test */))
2721            .cross_singleton(
2722                node.source_iter(q!(vec![1, 2, 3]))
2723                    .atomic(&tick)
2724                    .fold(q!(|| 0), q!(|acc, v| *acc += v))
2725                    .snapshot_atomic(nondet!(/** test */)),
2726            )
2727            .all_ticks()
2728            .send_bincode_external(&external);
2729
2730        let nodes = flow
2731            .with_process(&node, deployment.Localhost())
2732            .with_external(&external, deployment.Localhost())
2733            .deploy(&mut deployment);
2734
2735        deployment.deploy().await.unwrap();
2736
2737        let mut external_in = nodes.connect(input_port).await;
2738        let mut external_out = nodes.connect(out).await;
2739
2740        deployment.start().await.unwrap();
2741
2742        external_in.send(1).await.unwrap();
2743        assert_eq!(external_out.next().await.unwrap(), (1, 6));
2744
2745        external_in.send(2).await.unwrap();
2746        assert_eq!(external_out.next().await.unwrap(), (2, 6));
2747    }
2748
2749    #[cfg(feature = "deploy")]
2750    #[tokio::test]
2751    async fn unbounded_scan_remembers_state() {
2752        let mut deployment = Deployment::new();
2753
2754        let flow = FlowBuilder::new();
2755        let node = flow.process::<()>();
2756        let external = flow.external::<()>();
2757
2758        let (input_port, input) = node.source_external_bincode(&external);
2759        let out = input
2760            .scan(
2761                q!(|| 0),
2762                q!(|acc, v| {
2763                    *acc += v;
2764                    Some(*acc)
2765                }),
2766            )
2767            .send_bincode_external(&external);
2768
2769        let nodes = flow
2770            .with_process(&node, deployment.Localhost())
2771            .with_external(&external, deployment.Localhost())
2772            .deploy(&mut deployment);
2773
2774        deployment.deploy().await.unwrap();
2775
2776        let mut external_in = nodes.connect(input_port).await;
2777        let mut external_out = nodes.connect(out).await;
2778
2779        deployment.start().await.unwrap();
2780
2781        external_in.send(1).await.unwrap();
2782        assert_eq!(external_out.next().await.unwrap(), 1);
2783
2784        external_in.send(2).await.unwrap();
2785        assert_eq!(external_out.next().await.unwrap(), 3);
2786    }
2787
2788    #[cfg(feature = "deploy")]
2789    #[tokio::test]
2790    async fn unbounded_enumerate_remembers_state() {
2791        let mut deployment = Deployment::new();
2792
2793        let flow = FlowBuilder::new();
2794        let node = flow.process::<()>();
2795        let external = flow.external::<()>();
2796
2797        let (input_port, input) = node.source_external_bincode(&external);
2798        let out = input.enumerate().send_bincode_external(&external);
2799
2800        let nodes = flow
2801            .with_process(&node, deployment.Localhost())
2802            .with_external(&external, deployment.Localhost())
2803            .deploy(&mut deployment);
2804
2805        deployment.deploy().await.unwrap();
2806
2807        let mut external_in = nodes.connect(input_port).await;
2808        let mut external_out = nodes.connect(out).await;
2809
2810        deployment.start().await.unwrap();
2811
2812        external_in.send(1).await.unwrap();
2813        assert_eq!(external_out.next().await.unwrap(), (0, 1));
2814
2815        external_in.send(2).await.unwrap();
2816        assert_eq!(external_out.next().await.unwrap(), (1, 2));
2817    }
2818
2819    #[cfg(feature = "deploy")]
2820    #[tokio::test]
2821    async fn unbounded_unique_remembers_state() {
2822        let mut deployment = Deployment::new();
2823
2824        let flow = FlowBuilder::new();
2825        let node = flow.process::<()>();
2826        let external = flow.external::<()>();
2827
2828        let (input_port, input) =
2829            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2830        let out = input.unique().send_bincode_external(&external);
2831
2832        let nodes = flow
2833            .with_process(&node, deployment.Localhost())
2834            .with_external(&external, deployment.Localhost())
2835            .deploy(&mut deployment);
2836
2837        deployment.deploy().await.unwrap();
2838
2839        let mut external_in = nodes.connect(input_port).await;
2840        let mut external_out = nodes.connect(out).await;
2841
2842        deployment.start().await.unwrap();
2843
2844        external_in.send(1).await.unwrap();
2845        assert_eq!(external_out.next().await.unwrap(), 1);
2846
2847        external_in.send(2).await.unwrap();
2848        assert_eq!(external_out.next().await.unwrap(), 2);
2849
2850        external_in.send(1).await.unwrap();
2851        external_in.send(3).await.unwrap();
2852        assert_eq!(external_out.next().await.unwrap(), 3);
2853    }
2854
2855    #[cfg(feature = "sim")]
2856    #[test]
2857    #[should_panic]
2858    fn sim_batch_nondet_size() {
2859        let flow = FlowBuilder::new();
2860        let node = flow.process::<()>();
2861
2862        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
2863
2864        let tick = node.tick();
2865        let out_recv = input
2866            .batch(&tick, nondet!(/** test */))
2867            .count()
2868            .all_ticks()
2869            .sim_output();
2870
2871        flow.sim().exhaustive(async || {
2872            in_send.send(());
2873            in_send.send(());
2874            in_send.send(());
2875
2876            assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
2877        });
2878    }
2879
2880    #[cfg(feature = "sim")]
2881    #[test]
2882    fn sim_batch_preserves_order() {
2883        let flow = FlowBuilder::new();
2884        let node = flow.process::<()>();
2885
2886        let (in_send, input) = node.sim_input();
2887
2888        let tick = node.tick();
2889        let out_recv = input
2890            .batch(&tick, nondet!(/** test */))
2891            .all_ticks()
2892            .sim_output();
2893
2894        flow.sim().exhaustive(async || {
2895            in_send.send(1);
2896            in_send.send(2);
2897            in_send.send(3);
2898
2899            out_recv.assert_yields_only([1, 2, 3]).await;
2900        });
2901    }
2902
2903    #[cfg(feature = "sim")]
2904    #[test]
2905    #[should_panic]
2906    fn sim_batch_unordered_shuffles() {
2907        let flow = FlowBuilder::new();
2908        let node = flow.process::<()>();
2909
2910        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2911
2912        let tick = node.tick();
2913        let batch = input.batch(&tick, nondet!(/** test */));
2914        let out_recv = batch
2915            .clone()
2916            .min()
2917            .zip(batch.max())
2918            .all_ticks()
2919            .sim_output();
2920
2921        flow.sim().exhaustive(async || {
2922            in_send.send_many_unordered([1, 2, 3]);
2923
2924            if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
2925                panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
2926            }
2927        });
2928    }
2929
2930    #[cfg(feature = "sim")]
2931    #[test]
2932    fn sim_batch_unordered_shuffles_count() {
2933        let flow = FlowBuilder::new();
2934        let node = flow.process::<()>();
2935
2936        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2937
2938        let tick = node.tick();
2939        let batch = input.batch(&tick, nondet!(/** test */));
2940        let out_recv = batch.all_ticks().sim_output();
2941
2942        let instance_count = flow.sim().exhaustive(async || {
2943            in_send.send_many_unordered([1, 2, 3, 4]);
2944            out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
2945        });
2946
2947        assert_eq!(
2948            instance_count,
2949            75 // ∑ (k=1 to 4) S(4,k) × k! = 75
2950        )
2951    }
2952
2953    #[cfg(feature = "sim")]
2954    #[test]
2955    #[should_panic]
2956    fn sim_observe_order_batched() {
2957        let flow = FlowBuilder::new();
2958        let node = flow.process::<()>();
2959
2960        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2961
2962        let tick = node.tick();
2963        let batch = input.batch(&tick, nondet!(/** test */));
2964        let out_recv = batch
2965            .assume_ordering::<TotalOrder>(nondet!(/** test */))
2966            .all_ticks()
2967            .sim_output();
2968
2969        flow.sim().exhaustive(async || {
2970            in_send.send_many_unordered([1, 2, 3, 4]);
2971            out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
2972        });
2973    }
2974
2975    #[cfg(feature = "sim")]
2976    #[test]
2977    fn sim_observe_order_batched_count() {
2978        let flow = FlowBuilder::new();
2979        let node = flow.process::<()>();
2980
2981        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2982
2983        let tick = node.tick();
2984        let batch = input.batch(&tick, nondet!(/** test */));
2985        let out_recv = batch
2986            .assume_ordering::<TotalOrder>(nondet!(/** test */))
2987            .all_ticks()
2988            .sim_output();
2989
2990        let instance_count = flow.sim().exhaustive(async || {
2991            in_send.send_many_unordered([1, 2, 3, 4]);
2992            let _ = out_recv.collect::<Vec<_>>().await;
2993        });
2994
2995        assert_eq!(
2996            instance_count,
2997            192 // 4! * 2^{4 - 1}
2998        )
2999    }
3000
3001    #[cfg(feature = "sim")]
3002    #[test]
3003    fn sim_unordered_count_instance_count() {
3004        let flow = FlowBuilder::new();
3005        let node = flow.process::<()>();
3006
3007        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3008
3009        let tick = node.tick();
3010        let out_recv = input
3011            .count()
3012            .snapshot(&tick, nondet!(/** test */))
3013            .all_ticks()
3014            .sim_output();
3015
3016        let instance_count = flow.sim().exhaustive(async || {
3017            in_send.send_many_unordered([1, 2, 3, 4]);
3018            assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3019        });
3020
3021        assert_eq!(
3022            instance_count,
3023            16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3024        )
3025    }
3026}