hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::KeyedStream;
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::ir::{
19    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
20};
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::forward_handle::{ForwardRef, TickCycle};
24#[cfg(stageleft_runtime)]
25use crate::location::dynamic::{DynLocation, LocationId};
26use crate::location::tick::{Atomic, DeferTick, NoAtomic};
27use crate::location::{Location, NoTick, Tick, check_matching_location};
28use crate::nondet::{NonDet, nondet};
29
30pub mod networking;
31
32/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
33#[sealed::sealed]
34pub trait Ordering:
35    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
36{
37    /// The [`StreamOrder`] corresponding to this type.
38    const ORDERING_KIND: StreamOrder;
39}
40
41/// Marks the stream as being totally ordered, which means that there are
42/// no sources of non-determinism (other than intentional ones) that will
43/// affect the order of elements.
44pub enum TotalOrder {}
45
46#[sealed::sealed]
47impl Ordering for TotalOrder {
48    const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
49}
50
51/// Marks the stream as having no order, which means that the order of
52/// elements may be affected by non-determinism.
53///
54/// This restricts certain operators, such as `fold` and `reduce`, to only
55/// be used with commutative aggregation functions.
56pub enum NoOrder {}
57
58#[sealed::sealed]
59impl Ordering for NoOrder {
60    const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
61}
62
63/// Helper trait for determining the weakest of two orderings.
64#[sealed::sealed]
65pub trait MinOrder<Other: ?Sized> {
66    /// The weaker of the two orderings.
67    type Min: Ordering;
68}
69
70#[sealed::sealed]
71impl MinOrder<NoOrder> for TotalOrder {
72    type Min = NoOrder;
73}
74
75#[sealed::sealed]
76impl MinOrder<TotalOrder> for TotalOrder {
77    type Min = TotalOrder;
78}
79
80#[sealed::sealed]
81impl MinOrder<TotalOrder> for NoOrder {
82    type Min = NoOrder;
83}
84
85#[sealed::sealed]
86impl MinOrder<NoOrder> for NoOrder {
87    type Min = NoOrder;
88}
89
90/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
91#[sealed::sealed]
92pub trait Retries:
93    MinRetries<Self, Min = Self>
94    + MinRetries<ExactlyOnce, Min = Self>
95    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
96{
97    /// The [`StreamRetry`] corresponding to this type.
98    const RETRIES_KIND: StreamRetry;
99}
100
101/// Marks the stream as having deterministic message cardinality, with no
102/// possibility of duplicates.
103pub enum ExactlyOnce {}
104
105#[sealed::sealed]
106impl Retries for ExactlyOnce {
107    const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
108}
109
110/// Marks the stream as having non-deterministic message cardinality, which
111/// means that duplicates may occur, but messages will not be dropped.
112pub enum AtLeastOnce {}
113
114#[sealed::sealed]
115impl Retries for AtLeastOnce {
116    const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
117}
118
119/// Helper trait for determining the weakest of two retry guarantees.
120#[sealed::sealed]
121pub trait MinRetries<Other: ?Sized> {
122    /// The weaker of the two retry guarantees.
123    type Min: Retries;
124}
125
126#[sealed::sealed]
127impl MinRetries<AtLeastOnce> for ExactlyOnce {
128    type Min = AtLeastOnce;
129}
130
131#[sealed::sealed]
132impl MinRetries<ExactlyOnce> for ExactlyOnce {
133    type Min = ExactlyOnce;
134}
135
136#[sealed::sealed]
137impl MinRetries<ExactlyOnce> for AtLeastOnce {
138    type Min = AtLeastOnce;
139}
140
141#[sealed::sealed]
142impl MinRetries<AtLeastOnce> for AtLeastOnce {
143    type Min = AtLeastOnce;
144}
145
146/// Streaming sequence of elements with type `Type`.
147///
148/// This live collection represents a growing sequence of elements, with new elements being
149/// asynchronously appended to the end of the sequence. This can be used to model the arrival
150/// of network input, such as API requests, or streaming ingestion.
151///
152/// By default, all streams have deterministic ordering and each element is materialized exactly
153/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
154/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
155/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
156///
157/// Type Parameters:
158/// - `Type`: the type of elements in the stream
159/// - `Loc`: the location where the stream is being materialized
160/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
161/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
162///   (default is [`TotalOrder`])
163/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
164///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
165pub struct Stream<
166    Type,
167    Loc,
168    Bound: Boundedness = Unbounded,
169    Order: Ordering = TotalOrder,
170    Retry: Retries = ExactlyOnce,
171> {
172    pub(crate) location: Loc,
173    pub(crate) ir_node: RefCell<HydroNode>,
174
175    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
176}
177
178impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
179    for Stream<T, L, Unbounded, O, R>
180where
181    L: Location<'a>,
182{
183    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
184        Stream {
185            location: stream.location,
186            ir_node: stream.ir_node,
187            _phantom: PhantomData,
188        }
189    }
190}
191
192impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
193    for Stream<T, L, B, NoOrder, R>
194where
195    L: Location<'a>,
196{
197    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
198        Stream {
199            location: stream.location,
200            ir_node: stream.ir_node,
201            _phantom: PhantomData,
202        }
203    }
204}
205
206impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
207    for Stream<T, L, B, O, AtLeastOnce>
208where
209    L: Location<'a>,
210{
211    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
212        Stream {
213            location: stream.location,
214            ir_node: stream.ir_node,
215            _phantom: PhantomData,
216        }
217    }
218}
219
220impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
221where
222    L: Location<'a>,
223{
224    fn defer_tick(self) -> Self {
225        Stream::defer_tick(self)
226    }
227}
228
229impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
230    for Stream<T, Tick<L>, Bounded, O, R>
231where
232    L: Location<'a>,
233{
234    type Location = Tick<L>;
235
236    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
237        Stream::new(
238            location.clone(),
239            HydroNode::CycleSource {
240                ident,
241                metadata: location.new_node_metadata(Self::collection_kind()),
242            },
243        )
244    }
245}
246
247impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
248    for Stream<T, Tick<L>, Bounded, O, R>
249where
250    L: Location<'a>,
251{
252    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
253        assert_eq!(
254            Location::id(&self.location),
255            expected_location,
256            "locations do not match"
257        );
258        self.location
259            .flow_state()
260            .borrow_mut()
261            .push_root(HydroRoot::CycleSink {
262                ident,
263                input: Box::new(self.ir_node.into_inner()),
264                op_metadata: HydroIrOpMetadata::new(),
265            });
266    }
267}
268
269impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
270    for Stream<T, L, B, O, R>
271where
272    L: Location<'a> + NoTick,
273{
274    type Location = L;
275
276    fn create_source(ident: syn::Ident, location: L) -> Self {
277        Stream::new(
278            location.clone(),
279            HydroNode::CycleSource {
280                ident,
281                metadata: location.new_node_metadata(Self::collection_kind()),
282            },
283        )
284    }
285}
286
287impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
288    for Stream<T, L, B, O, R>
289where
290    L: Location<'a> + NoTick,
291{
292    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
293        assert_eq!(
294            Location::id(&self.location),
295            expected_location,
296            "locations do not match"
297        );
298        self.location
299            .flow_state()
300            .borrow_mut()
301            .push_root(HydroRoot::CycleSink {
302                ident,
303                input: Box::new(self.ir_node.into_inner()),
304                op_metadata: HydroIrOpMetadata::new(),
305            });
306    }
307}
308
309impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
310where
311    T: Clone,
312    L: Location<'a>,
313{
314    fn clone(&self) -> Self {
315        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
316            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
317            *self.ir_node.borrow_mut() = HydroNode::Tee {
318                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
319                metadata: self.location.new_node_metadata(Self::collection_kind()),
320            };
321        }
322
323        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
324            Stream {
325                location: self.location.clone(),
326                ir_node: HydroNode::Tee {
327                    inner: TeeNode(inner.0.clone()),
328                    metadata: metadata.clone(),
329                }
330                .into(),
331                _phantom: PhantomData,
332            }
333        } else {
334            unreachable!()
335        }
336    }
337}
338
339impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
340where
341    L: Location<'a>,
342{
343    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
344        debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
345        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
346
347        Stream {
348            location,
349            ir_node: RefCell::new(ir_node),
350            _phantom: PhantomData,
351        }
352    }
353
354    /// Returns the [`Location`] where this stream is being materialized.
355    pub fn location(&self) -> &L {
356        &self.location
357    }
358
359    pub(crate) fn collection_kind() -> CollectionKind {
360        CollectionKind::Stream {
361            bound: B::BOUND_KIND,
362            order: O::ORDERING_KIND,
363            retry: R::RETRIES_KIND,
364            element_type: stageleft::quote_type::<T>().into(),
365        }
366    }
367
368    /// Produces a stream based on invoking `f` on each element.
369    /// If you do not want to modify the stream and instead only want to view
370    /// each item use [`Stream::inspect`] instead.
371    ///
372    /// # Example
373    /// ```rust
374    /// # #[cfg(feature = "deploy")] {
375    /// # use hydro_lang::prelude::*;
376    /// # use futures::StreamExt;
377    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
378    /// let words = process.source_iter(q!(vec!["hello", "world"]));
379    /// words.map(q!(|x| x.to_uppercase()))
380    /// # }, |mut stream| async move {
381    /// # for w in vec!["HELLO", "WORLD"] {
382    /// #     assert_eq!(stream.next().await.unwrap(), w);
383    /// # }
384    /// # }));
385    /// # }
386    /// ```
387    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
388    where
389        F: Fn(T) -> U + 'a,
390    {
391        let f = f.splice_fn1_ctx(&self.location).into();
392        Stream::new(
393            self.location.clone(),
394            HydroNode::Map {
395                f,
396                input: Box::new(self.ir_node.into_inner()),
397                metadata: self
398                    .location
399                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
400            },
401        )
402    }
403
404    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
405    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
406    /// for the output type `U` must produce items in a **deterministic** order.
407    ///
408    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
409    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
410    ///
411    /// # Example
412    /// ```rust
413    /// # #[cfg(feature = "deploy")] {
414    /// # use hydro_lang::prelude::*;
415    /// # use futures::StreamExt;
416    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
417    /// process
418    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
419    ///     .flat_map_ordered(q!(|x| x))
420    /// # }, |mut stream| async move {
421    /// // 1, 2, 3, 4
422    /// # for w in (1..5) {
423    /// #     assert_eq!(stream.next().await.unwrap(), w);
424    /// # }
425    /// # }));
426    /// # }
427    /// ```
428    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
429    where
430        I: IntoIterator<Item = U>,
431        F: Fn(T) -> I + 'a,
432    {
433        let f = f.splice_fn1_ctx(&self.location).into();
434        Stream::new(
435            self.location.clone(),
436            HydroNode::FlatMap {
437                f,
438                input: Box::new(self.ir_node.into_inner()),
439                metadata: self
440                    .location
441                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
442            },
443        )
444    }
445
446    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
447    /// for the output type `U` to produce items in any order.
448    ///
449    /// # Example
450    /// ```rust
451    /// # #[cfg(feature = "deploy")] {
452    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
453    /// # use futures::StreamExt;
454    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
455    /// process
456    ///     .source_iter(q!(vec![
457    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
458    ///         std::collections::HashSet::from_iter(vec![3, 4]),
459    ///     ]))
460    ///     .flat_map_unordered(q!(|x| x))
461    /// # }, |mut stream| async move {
462    /// // 1, 2, 3, 4, but in no particular order
463    /// # let mut results = Vec::new();
464    /// # for w in (1..5) {
465    /// #     results.push(stream.next().await.unwrap());
466    /// # }
467    /// # results.sort();
468    /// # assert_eq!(results, vec![1, 2, 3, 4]);
469    /// # }));
470    /// # }
471    /// ```
472    pub fn flat_map_unordered<U, I, F>(
473        self,
474        f: impl IntoQuotedMut<'a, F, L>,
475    ) -> Stream<U, L, B, NoOrder, R>
476    where
477        I: IntoIterator<Item = U>,
478        F: Fn(T) -> I + 'a,
479    {
480        let f = f.splice_fn1_ctx(&self.location).into();
481        Stream::new(
482            self.location.clone(),
483            HydroNode::FlatMap {
484                f,
485                input: Box::new(self.ir_node.into_inner()),
486                metadata: self
487                    .location
488                    .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
489            },
490        )
491    }
492
493    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
494    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
495    ///
496    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
497    /// not deterministic, use [`Stream::flatten_unordered`] instead.
498    ///
499    /// ```rust
500    /// # #[cfg(feature = "deploy")] {
501    /// # use hydro_lang::prelude::*;
502    /// # use futures::StreamExt;
503    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
504    /// process
505    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
506    ///     .flatten_ordered()
507    /// # }, |mut stream| async move {
508    /// // 1, 2, 3, 4
509    /// # for w in (1..5) {
510    /// #     assert_eq!(stream.next().await.unwrap(), w);
511    /// # }
512    /// # }));
513    /// # }
514    /// ```
515    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
516    where
517        T: IntoIterator<Item = U>,
518    {
519        self.flat_map_ordered(q!(|d| d))
520    }
521
522    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
523    /// for the element type `T` to produce items in any order.
524    ///
525    /// # Example
526    /// ```rust
527    /// # #[cfg(feature = "deploy")] {
528    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
529    /// # use futures::StreamExt;
530    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
531    /// process
532    ///     .source_iter(q!(vec![
533    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
534    ///         std::collections::HashSet::from_iter(vec![3, 4]),
535    ///     ]))
536    ///     .flatten_unordered()
537    /// # }, |mut stream| async move {
538    /// // 1, 2, 3, 4, but in no particular order
539    /// # let mut results = Vec::new();
540    /// # for w in (1..5) {
541    /// #     results.push(stream.next().await.unwrap());
542    /// # }
543    /// # results.sort();
544    /// # assert_eq!(results, vec![1, 2, 3, 4]);
545    /// # }));
546    /// # }
547    /// ```
548    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
549    where
550        T: IntoIterator<Item = U>,
551    {
552        self.flat_map_unordered(q!(|d| d))
553    }
554
555    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
556    /// `f`, preserving the order of the elements.
557    ///
558    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
559    /// not modify or take ownership of the values. If you need to modify the values while filtering
560    /// use [`Stream::filter_map`] instead.
561    ///
562    /// # Example
563    /// ```rust
564    /// # #[cfg(feature = "deploy")] {
565    /// # use hydro_lang::prelude::*;
566    /// # use futures::StreamExt;
567    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
568    /// process
569    ///     .source_iter(q!(vec![1, 2, 3, 4]))
570    ///     .filter(q!(|&x| x > 2))
571    /// # }, |mut stream| async move {
572    /// // 3, 4
573    /// # for w in (3..5) {
574    /// #     assert_eq!(stream.next().await.unwrap(), w);
575    /// # }
576    /// # }));
577    /// # }
578    /// ```
579    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
580    where
581        F: Fn(&T) -> bool + 'a,
582    {
583        let f = f.splice_fn1_borrow_ctx(&self.location).into();
584        Stream::new(
585            self.location.clone(),
586            HydroNode::Filter {
587                f,
588                input: Box::new(self.ir_node.into_inner()),
589                metadata: self.location.new_node_metadata(Self::collection_kind()),
590            },
591        )
592    }
593
594    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
595    ///
596    /// # Example
597    /// ```rust
598    /// # #[cfg(feature = "deploy")] {
599    /// # use hydro_lang::prelude::*;
600    /// # use futures::StreamExt;
601    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
602    /// process
603    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
604    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
605    /// # }, |mut stream| async move {
606    /// // 1, 2
607    /// # for w in (1..3) {
608    /// #     assert_eq!(stream.next().await.unwrap(), w);
609    /// # }
610    /// # }));
611    /// # }
612    /// ```
613    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
614    where
615        F: Fn(T) -> Option<U> + 'a,
616    {
617        let f = f.splice_fn1_ctx(&self.location).into();
618        Stream::new(
619            self.location.clone(),
620            HydroNode::FilterMap {
621                f,
622                input: Box::new(self.ir_node.into_inner()),
623                metadata: self
624                    .location
625                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
626            },
627        )
628    }
629
630    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
631    /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
632    /// If `other` is an empty [`Optional`], no values will be produced.
633    ///
634    /// # Example
635    /// ```rust
636    /// # #[cfg(feature = "deploy")] {
637    /// # use hydro_lang::prelude::*;
638    /// # use futures::StreamExt;
639    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
640    /// let tick = process.tick();
641    /// let batch = process
642    ///   .source_iter(q!(vec![1, 2, 3, 4]))
643    ///   .batch(&tick, nondet!(/** test */));
644    /// let count = batch.clone().count(); // `count()` returns a singleton
645    /// batch.cross_singleton(count).all_ticks()
646    /// # }, |mut stream| async move {
647    /// // (1, 4), (2, 4), (3, 4), (4, 4)
648    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
649    /// #     assert_eq!(stream.next().await.unwrap(), w);
650    /// # }
651    /// # }));
652    /// # }
653    /// ```
654    pub fn cross_singleton<O2>(
655        self,
656        other: impl Into<Optional<O2, L, Bounded>>,
657    ) -> Stream<(T, O2), L, B, O, R>
658    where
659        O2: Clone,
660    {
661        let other: Optional<O2, L, Bounded> = other.into();
662        check_matching_location(&self.location, &other.location);
663
664        Stream::new(
665            self.location.clone(),
666            HydroNode::CrossSingleton {
667                left: Box::new(self.ir_node.into_inner()),
668                right: Box::new(other.ir_node.into_inner()),
669                metadata: self
670                    .location
671                    .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
672            },
673        )
674    }
675
676    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
677    ///
678    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
679    /// leader of a cluster.
680    ///
681    /// # Example
682    /// ```rust
683    /// # #[cfg(feature = "deploy")] {
684    /// # use hydro_lang::prelude::*;
685    /// # use futures::StreamExt;
686    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
687    /// let tick = process.tick();
688    /// // ticks are lazy by default, forces the second tick to run
689    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
690    ///
691    /// let batch_first_tick = process
692    ///   .source_iter(q!(vec![1, 2, 3, 4]))
693    ///   .batch(&tick, nondet!(/** test */));
694    /// let batch_second_tick = process
695    ///   .source_iter(q!(vec![5, 6, 7, 8]))
696    ///   .batch(&tick, nondet!(/** test */))
697    ///   .defer_tick(); // appears on the second tick
698    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
699    /// batch_first_tick.chain(batch_second_tick)
700    ///   .filter_if_some(some_on_first_tick)
701    ///   .all_ticks()
702    /// # }, |mut stream| async move {
703    /// // [1, 2, 3, 4]
704    /// # for w in vec![1, 2, 3, 4] {
705    /// #     assert_eq!(stream.next().await.unwrap(), w);
706    /// # }
707    /// # }));
708    /// # }
709    /// ```
710    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
711        self.cross_singleton(signal.map(q!(|_u| ())))
712            .map(q!(|(d, _signal)| d))
713    }
714
715    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
716    ///
717    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
718    /// some local state.
719    ///
720    /// # Example
721    /// ```rust
722    /// # #[cfg(feature = "deploy")] {
723    /// # use hydro_lang::prelude::*;
724    /// # use futures::StreamExt;
725    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
726    /// let tick = process.tick();
727    /// // ticks are lazy by default, forces the second tick to run
728    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
729    ///
730    /// let batch_first_tick = process
731    ///   .source_iter(q!(vec![1, 2, 3, 4]))
732    ///   .batch(&tick, nondet!(/** test */));
733    /// let batch_second_tick = process
734    ///   .source_iter(q!(vec![5, 6, 7, 8]))
735    ///   .batch(&tick, nondet!(/** test */))
736    ///   .defer_tick(); // appears on the second tick
737    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
738    /// batch_first_tick.chain(batch_second_tick)
739    ///   .filter_if_none(some_on_first_tick)
740    ///   .all_ticks()
741    /// # }, |mut stream| async move {
742    /// // [5, 6, 7, 8]
743    /// # for w in vec![5, 6, 7, 8] {
744    /// #     assert_eq!(stream.next().await.unwrap(), w);
745    /// # }
746    /// # }));
747    /// # }
748    /// ```
749    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
750        self.filter_if_some(
751            other
752                .map(q!(|_| ()))
753                .into_singleton()
754                .filter(q!(|o| o.is_none())),
755        )
756    }
757
758    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
759    /// tupled pairs in a non-deterministic order.
760    ///
761    /// # Example
762    /// ```rust
763    /// # #[cfg(feature = "deploy")] {
764    /// # use hydro_lang::prelude::*;
765    /// # use std::collections::HashSet;
766    /// # use futures::StreamExt;
767    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
768    /// let tick = process.tick();
769    /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
770    /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
771    /// stream1.cross_product(stream2)
772    /// # }, |mut stream| async move {
773    /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
774    /// # stream.map(|i| assert!(expected.contains(&i)));
775    /// # }));
776    /// # }
777    /// ```
778    pub fn cross_product<T2, O2: Ordering>(
779        self,
780        other: Stream<T2, L, B, O2, R>,
781    ) -> Stream<(T, T2), L, B, NoOrder, R>
782    where
783        T: Clone,
784        T2: Clone,
785    {
786        check_matching_location(&self.location, &other.location);
787
788        Stream::new(
789            self.location.clone(),
790            HydroNode::CrossProduct {
791                left: Box::new(self.ir_node.into_inner()),
792                right: Box::new(other.ir_node.into_inner()),
793                metadata: self
794                    .location
795                    .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
796            },
797        )
798    }
799
800    /// Takes one stream as input and filters out any duplicate occurrences. The output
801    /// contains all unique values from the input.
802    ///
803    /// # Example
804    /// ```rust
805    /// # #[cfg(feature = "deploy")] {
806    /// # use hydro_lang::prelude::*;
807    /// # use futures::StreamExt;
808    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
809    /// let tick = process.tick();
810    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
811    /// # }, |mut stream| async move {
812    /// # for w in vec![1, 2, 3, 4] {
813    /// #     assert_eq!(stream.next().await.unwrap(), w);
814    /// # }
815    /// # }));
816    /// # }
817    /// ```
818    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
819    where
820        T: Eq + Hash,
821    {
822        Stream::new(
823            self.location.clone(),
824            HydroNode::Unique {
825                input: Box::new(self.ir_node.into_inner()),
826                metadata: self
827                    .location
828                    .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
829            },
830        )
831    }
832
833    /// Outputs everything in this stream that is *not* contained in the `other` stream.
834    ///
835    /// The `other` stream must be [`Bounded`], since this function will wait until
836    /// all its elements are available before producing any output.
837    /// # Example
838    /// ```rust
839    /// # #[cfg(feature = "deploy")] {
840    /// # use hydro_lang::prelude::*;
841    /// # use futures::StreamExt;
842    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
843    /// let tick = process.tick();
844    /// let stream = process
845    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
846    ///   .batch(&tick, nondet!(/** test */));
847    /// let batch = process
848    ///   .source_iter(q!(vec![1, 2]))
849    ///   .batch(&tick, nondet!(/** test */));
850    /// stream.filter_not_in(batch).all_ticks()
851    /// # }, |mut stream| async move {
852    /// # for w in vec![3, 4] {
853    /// #     assert_eq!(stream.next().await.unwrap(), w);
854    /// # }
855    /// # }));
856    /// # }
857    /// ```
858    pub fn filter_not_in<O2: Ordering>(self, other: Stream<T, L, Bounded, O2, R>) -> Self
859    where
860        T: Eq + Hash,
861    {
862        check_matching_location(&self.location, &other.location);
863
864        Stream::new(
865            self.location.clone(),
866            HydroNode::Difference {
867                pos: Box::new(self.ir_node.into_inner()),
868                neg: Box::new(other.ir_node.into_inner()),
869                metadata: self
870                    .location
871                    .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
872            },
873        )
874    }
875
876    /// An operator which allows you to "inspect" each element of a stream without
877    /// modifying it. The closure `f` is called on a reference to each item. This is
878    /// mainly useful for debugging, and should not be used to generate side-effects.
879    ///
880    /// # Example
881    /// ```rust
882    /// # #[cfg(feature = "deploy")] {
883    /// # use hydro_lang::prelude::*;
884    /// # use futures::StreamExt;
885    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
886    /// let nums = process.source_iter(q!(vec![1, 2]));
887    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
888    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
889    /// # }, |mut stream| async move {
890    /// # for w in vec![1, 2] {
891    /// #     assert_eq!(stream.next().await.unwrap(), w);
892    /// # }
893    /// # }));
894    /// # }
895    /// ```
896    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
897    where
898        F: Fn(&T) + 'a,
899    {
900        let f = f.splice_fn1_borrow_ctx(&self.location).into();
901
902        Stream::new(
903            self.location.clone(),
904            HydroNode::Inspect {
905                f,
906                input: Box::new(self.ir_node.into_inner()),
907                metadata: self.location.new_node_metadata(Self::collection_kind()),
908            },
909        )
910    }
911
912    /// An operator which allows you to "name" a `HydroNode`.
913    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
914    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
915        {
916            let mut node = self.ir_node.borrow_mut();
917            let metadata = node.metadata_mut();
918            metadata.tag = Some(name.to_string());
919        }
920        self
921    }
922
923    /// Explicitly "casts" the stream to a type with a different ordering
924    /// guarantee. Useful in unsafe code where the ordering cannot be proven
925    /// by the type-system.
926    ///
927    /// # Non-Determinism
928    /// This function is used as an escape hatch, and any mistakes in the
929    /// provided ordering guarantee will propagate into the guarantees
930    /// for the rest of the program.
931    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
932        if O::ORDERING_KIND == O2::ORDERING_KIND {
933            Stream::new(self.location, self.ir_node.into_inner())
934        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
935            // We can always weaken the ordering guarantee
936            Stream::new(
937                self.location.clone(),
938                HydroNode::Cast {
939                    inner: Box::new(self.ir_node.into_inner()),
940                    metadata: self
941                        .location
942                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
943                },
944            )
945        } else {
946            Stream::new(
947                self.location.clone(),
948                HydroNode::ObserveNonDet {
949                    inner: Box::new(self.ir_node.into_inner()),
950                    trusted: false,
951                    metadata: self
952                        .location
953                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
954                },
955            )
956        }
957    }
958
959    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
960    // is not observable
961    fn assume_ordering_trusted<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
962        if O::ORDERING_KIND == O2::ORDERING_KIND {
963            Stream::new(self.location, self.ir_node.into_inner())
964        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
965            // We can always weaken the ordering guarantee
966            Stream::new(
967                self.location.clone(),
968                HydroNode::Cast {
969                    inner: Box::new(self.ir_node.into_inner()),
970                    metadata: self
971                        .location
972                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
973                },
974            )
975        } else {
976            Stream::new(
977                self.location.clone(),
978                HydroNode::ObserveNonDet {
979                    inner: Box::new(self.ir_node.into_inner()),
980                    trusted: true,
981                    metadata: self
982                        .location
983                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
984                },
985            )
986        }
987    }
988
989    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
990    /// which is always safe because that is the weakest possible guarantee.
991    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
992        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
993        self.assume_ordering::<NoOrder>(nondet)
994    }
995
996    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
997    /// enforcing that `O2` is weaker than the input ordering guarantee.
998    pub fn weaken_ordering<O2: Ordering + MinOrder<O, Min = O2>>(self) -> Stream<T, L, B, O2, R> {
999        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
1000        self.assume_ordering::<O2>(nondet)
1001    }
1002
1003    /// Explicitly "casts" the stream to a type with a different retries
1004    /// guarantee. Useful in unsafe code where the lack of retries cannot
1005    /// be proven by the type-system.
1006    ///
1007    /// # Non-Determinism
1008    /// This function is used as an escape hatch, and any mistakes in the
1009    /// provided retries guarantee will propagate into the guarantees
1010    /// for the rest of the program.
1011    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1012        if R::RETRIES_KIND == R2::RETRIES_KIND {
1013            Stream::new(self.location, self.ir_node.into_inner())
1014        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1015            // We can always weaken the retries guarantee
1016            Stream::new(
1017                self.location.clone(),
1018                HydroNode::Cast {
1019                    inner: Box::new(self.ir_node.into_inner()),
1020                    metadata: self
1021                        .location
1022                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1023                },
1024            )
1025        } else {
1026            Stream::new(
1027                self.location.clone(),
1028                HydroNode::ObserveNonDet {
1029                    inner: Box::new(self.ir_node.into_inner()),
1030                    trusted: false,
1031                    metadata: self
1032                        .location
1033                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1034                },
1035            )
1036        }
1037    }
1038
1039    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1040    // is not observable
1041    fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1042        if R::RETRIES_KIND == R2::RETRIES_KIND {
1043            Stream::new(self.location, self.ir_node.into_inner())
1044        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1045            // We can always weaken the retries guarantee
1046            Stream::new(
1047                self.location.clone(),
1048                HydroNode::Cast {
1049                    inner: Box::new(self.ir_node.into_inner()),
1050                    metadata: self
1051                        .location
1052                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1053                },
1054            )
1055        } else {
1056            Stream::new(
1057                self.location.clone(),
1058                HydroNode::ObserveNonDet {
1059                    inner: Box::new(self.ir_node.into_inner()),
1060                    trusted: true,
1061                    metadata: self
1062                        .location
1063                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1064                },
1065            )
1066        }
1067    }
1068
1069    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
1070    /// which is always safe because that is the weakest possible guarantee.
1071    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
1072        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1073        self.assume_retries::<AtLeastOnce>(nondet)
1074    }
1075
1076    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
1077    /// enforcing that `R2` is weaker than the input retries guarantee.
1078    pub fn weaken_retries<R2: Retries + MinRetries<R, Min = R2>>(self) -> Stream<T, L, B, O, R2> {
1079        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1080        self.assume_retries::<R2>(nondet)
1081    }
1082}
1083
1084impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1085where
1086    L: Location<'a>,
1087{
1088    /// Given a stream with [`ExactlyOnce`] retry guarantees, weakens it to an arbitrary guarantee
1089    /// `R2`, which is safe because all guarantees are equal to or weaker than [`ExactlyOnce`]
1090    pub fn weaker_retries<R2: Retries>(self) -> Stream<T, L, B, O, R2> {
1091        self.assume_retries(
1092            nondet!(/** any retry ordering is the same or weaker than ExactlyOnce */),
1093        )
1094    }
1095}
1096
1097impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
1098where
1099    L: Location<'a>,
1100{
1101    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
1102    ///
1103    /// # Example
1104    /// ```rust
1105    /// # #[cfg(feature = "deploy")] {
1106    /// # use hydro_lang::prelude::*;
1107    /// # use futures::StreamExt;
1108    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1109    /// process.source_iter(q!(&[1, 2, 3])).cloned()
1110    /// # }, |mut stream| async move {
1111    /// // 1, 2, 3
1112    /// # for w in vec![1, 2, 3] {
1113    /// #     assert_eq!(stream.next().await.unwrap(), w);
1114    /// # }
1115    /// # }));
1116    /// # }
1117    /// ```
1118    pub fn cloned(self) -> Stream<T, L, B, O, R>
1119    where
1120        T: Clone,
1121    {
1122        self.map(q!(|d| d.clone()))
1123    }
1124}
1125
1126impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
1127where
1128    L: Location<'a>,
1129{
1130    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1131    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1132    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1133    ///
1134    /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
1135    /// and there may be duplicates.
1136    ///
1137    /// # Example
1138    /// ```rust
1139    /// # #[cfg(feature = "deploy")] {
1140    /// # use hydro_lang::prelude::*;
1141    /// # use futures::StreamExt;
1142    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1143    /// let tick = process.tick();
1144    /// let bools = process.source_iter(q!(vec![false, true, false]));
1145    /// let batch = bools.batch(&tick, nondet!(/** test */));
1146    /// batch
1147    ///     .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1148    ///     .all_ticks()
1149    /// # }, |mut stream| async move {
1150    /// // true
1151    /// # assert_eq!(stream.next().await.unwrap(), true);
1152    /// # }));
1153    /// # }
1154    /// ```
1155    pub fn fold_commutative_idempotent<A, I, F>(
1156        self,
1157        init: impl IntoQuotedMut<'a, I, L>,
1158        comb: impl IntoQuotedMut<'a, F, L>,
1159    ) -> Singleton<A, L, B>
1160    where
1161        I: Fn() -> A + 'a,
1162        F: Fn(&mut A, T),
1163    {
1164        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1165        self.assume_ordering(nondet)
1166            .assume_retries(nondet)
1167            .fold(init, comb)
1168    }
1169
1170    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1171    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1172    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1173    /// reference, so that it can be modified in place.
1174    ///
1175    /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
1176    /// and there may be duplicates.
1177    ///
1178    /// # Example
1179    /// ```rust
1180    /// # #[cfg(feature = "deploy")] {
1181    /// # use hydro_lang::prelude::*;
1182    /// # use futures::StreamExt;
1183    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1184    /// let tick = process.tick();
1185    /// let bools = process.source_iter(q!(vec![false, true, false]));
1186    /// let batch = bools.batch(&tick, nondet!(/** test */));
1187    /// batch
1188    ///     .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1189    ///     .all_ticks()
1190    /// # }, |mut stream| async move {
1191    /// // true
1192    /// # assert_eq!(stream.next().await.unwrap(), true);
1193    /// # }));
1194    /// # }
1195    /// ```
1196    pub fn reduce_commutative_idempotent<F>(
1197        self,
1198        comb: impl IntoQuotedMut<'a, F, L>,
1199    ) -> Optional<T, L, B>
1200    where
1201        F: Fn(&mut T, T) + 'a,
1202    {
1203        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1204        self.assume_retries(nondet).reduce_commutative(comb)
1205    }
1206
1207    // only for internal APIs that have been carefully vetted, will eventually be removed once we
1208    // have algebraic verification of these properties
1209    fn reduce_commutative_idempotent_trusted<F>(
1210        self,
1211        comb: impl IntoQuotedMut<'a, F, L>,
1212    ) -> Optional<T, L, B>
1213    where
1214        F: Fn(&mut T, T) + 'a,
1215    {
1216        self.assume_retries_trusted(nondet!(/** because the closure is trusted idempotent, retries don't affect intermediate states */))
1217            .reduce_commutative_trusted(comb)
1218    }
1219
1220    /// Computes the maximum element in the stream as an [`Optional`], which
1221    /// will be empty until the first element in the input arrives.
1222    ///
1223    /// # Example
1224    /// ```rust
1225    /// # #[cfg(feature = "deploy")] {
1226    /// # use hydro_lang::prelude::*;
1227    /// # use futures::StreamExt;
1228    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1229    /// let tick = process.tick();
1230    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1231    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1232    /// batch.max().all_ticks()
1233    /// # }, |mut stream| async move {
1234    /// // 4
1235    /// # assert_eq!(stream.next().await.unwrap(), 4);
1236    /// # }));
1237    /// # }
1238    /// ```
1239    pub fn max(self) -> Optional<T, L, B>
1240    where
1241        T: Ord,
1242    {
1243        self.reduce_commutative_idempotent_trusted(q!(|curr, new| {
1244            if new > *curr {
1245                *curr = new;
1246            }
1247        }))
1248    }
1249
1250    /// Computes the minimum element in the stream as an [`Optional`], which
1251    /// will be empty until the first element in the input arrives.
1252    ///
1253    /// # Example
1254    /// ```rust
1255    /// # #[cfg(feature = "deploy")] {
1256    /// # use hydro_lang::prelude::*;
1257    /// # use futures::StreamExt;
1258    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1259    /// let tick = process.tick();
1260    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1261    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1262    /// batch.min().all_ticks()
1263    /// # }, |mut stream| async move {
1264    /// // 1
1265    /// # assert_eq!(stream.next().await.unwrap(), 1);
1266    /// # }));
1267    /// # }
1268    /// ```
1269    pub fn min(self) -> Optional<T, L, B>
1270    where
1271        T: Ord,
1272    {
1273        self.reduce_commutative_idempotent_trusted(q!(|curr, new| {
1274            if new < *curr {
1275                *curr = new;
1276            }
1277        }))
1278    }
1279}
1280
1281impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1282where
1283    L: Location<'a>,
1284{
1285    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1286    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1287    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1288    ///
1289    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1290    ///
1291    /// # Example
1292    /// ```rust
1293    /// # #[cfg(feature = "deploy")] {
1294    /// # use hydro_lang::prelude::*;
1295    /// # use futures::StreamExt;
1296    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1297    /// let tick = process.tick();
1298    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1299    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1300    /// batch
1301    ///     .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1302    ///     .all_ticks()
1303    /// # }, |mut stream| async move {
1304    /// // 10
1305    /// # assert_eq!(stream.next().await.unwrap(), 10);
1306    /// # }));
1307    /// # }
1308    /// ```
1309    pub fn fold_commutative<A, I, F>(
1310        self,
1311        init: impl IntoQuotedMut<'a, I, L>,
1312        comb: impl IntoQuotedMut<'a, F, L>,
1313    ) -> Singleton<A, L, B>
1314    where
1315        I: Fn() -> A + 'a,
1316        F: Fn(&mut A, T),
1317    {
1318        let nondet = nondet!(/** the combinator function is commutative */);
1319        self.assume_ordering(nondet).fold(init, comb)
1320    }
1321
1322    /// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream,
1323    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1324    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1325    /// reference, so that it can be modified in place.
1326    ///
1327    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1328    ///
1329    /// # Example
1330    /// ```rust
1331    /// # #[cfg(feature = "deploy")] {
1332    /// # use hydro_lang::prelude::*;
1333    /// # use futures::StreamExt;
1334    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1335    /// let tick = process.tick();
1336    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1337    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1338    /// batch
1339    ///     .reduce_commutative(q!(|curr, new| *curr += new))
1340    ///     .all_ticks()
1341    /// # }, |mut stream| async move {
1342    /// // 10
1343    /// # assert_eq!(stream.next().await.unwrap(), 10);
1344    /// # }));
1345    /// # }
1346    /// ```
1347    pub fn reduce_commutative<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1348    where
1349        F: Fn(&mut T, T) + 'a,
1350    {
1351        let nondet = nondet!(/** the combinator function is commutative */);
1352        self.assume_ordering(nondet).reduce(comb)
1353    }
1354
1355    fn reduce_commutative_trusted<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1356    where
1357        F: Fn(&mut T, T) + 'a,
1358    {
1359        let ordered = if B::BOUNDED {
1360            self.assume_ordering_trusted(nondet!(/** if bounded, there are no intermediate states and output is deterministic because trusted commutative */))
1361        } else {
1362            self.assume_ordering(nondet!(/** if unbounded, ordering affects intermediate states */))
1363        };
1364
1365        ordered.reduce(comb)
1366    }
1367
1368    /// Computes the number of elements in the stream as a [`Singleton`].
1369    ///
1370    /// # Example
1371    /// ```rust
1372    /// # #[cfg(feature = "deploy")] {
1373    /// # use hydro_lang::prelude::*;
1374    /// # use futures::StreamExt;
1375    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1376    /// let tick = process.tick();
1377    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1378    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1379    /// batch.count().all_ticks()
1380    /// # }, |mut stream| async move {
1381    /// // 4
1382    /// # assert_eq!(stream.next().await.unwrap(), 4);
1383    /// # }));
1384    /// # }
1385    /// ```
1386    pub fn count(self) -> Singleton<usize, L, B> {
1387        self.assume_ordering_trusted(nondet!(
1388            /// Order does not affect eventual count, and also does not affect intermediate states.
1389        ))
1390        .fold(q!(|| 0usize), q!(|count, _| *count += 1))
1391    }
1392}
1393
1394impl<'a, T, L, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R>
1395where
1396    L: Location<'a>,
1397{
1398    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1399    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1400    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1401    ///
1402    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1403    ///
1404    /// # Example
1405    /// ```rust
1406    /// # #[cfg(feature = "deploy")] {
1407    /// # use hydro_lang::prelude::*;
1408    /// # use futures::StreamExt;
1409    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1410    /// let tick = process.tick();
1411    /// let bools = process.source_iter(q!(vec![false, true, false]));
1412    /// let batch = bools.batch(&tick, nondet!(/** test */));
1413    /// batch
1414    ///     .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1415    ///     .all_ticks()
1416    /// # }, |mut stream| async move {
1417    /// // true
1418    /// # assert_eq!(stream.next().await.unwrap(), true);
1419    /// # }));
1420    /// # }
1421    /// ```
1422    pub fn fold_idempotent<A, I, F>(
1423        self,
1424        init: impl IntoQuotedMut<'a, I, L>,
1425        comb: impl IntoQuotedMut<'a, F, L>,
1426    ) -> Singleton<A, L, B>
1427    where
1428        I: Fn() -> A + 'a,
1429        F: Fn(&mut A, T),
1430    {
1431        let nondet = nondet!(/** the combinator function is idempotent */);
1432        self.assume_retries(nondet).fold(init, comb)
1433    }
1434
1435    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1436    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1437    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1438    /// reference, so that it can be modified in place.
1439    ///
1440    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1441    ///
1442    /// # Example
1443    /// ```rust
1444    /// # #[cfg(feature = "deploy")] {
1445    /// # use hydro_lang::prelude::*;
1446    /// # use futures::StreamExt;
1447    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1448    /// let tick = process.tick();
1449    /// let bools = process.source_iter(q!(vec![false, true, false]));
1450    /// let batch = bools.batch(&tick, nondet!(/** test */));
1451    /// batch.reduce_idempotent(q!(|acc, x| *acc |= x)).all_ticks()
1452    /// # }, |mut stream| async move {
1453    /// // true
1454    /// # assert_eq!(stream.next().await.unwrap(), true);
1455    /// # }));
1456    /// # }
1457    /// ```
1458    pub fn reduce_idempotent<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1459    where
1460        F: Fn(&mut T, T) + 'a,
1461    {
1462        let nondet = nondet!(/** the combinator function is idempotent */);
1463        self.assume_retries(nondet).reduce(comb)
1464    }
1465
1466    /// Computes the first element in the stream as an [`Optional`], which
1467    /// will be empty until the first element in the input arrives.
1468    ///
1469    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1470    /// re-ordering of elements may cause the first element to change.
1471    ///
1472    /// # Example
1473    /// ```rust
1474    /// # #[cfg(feature = "deploy")] {
1475    /// # use hydro_lang::prelude::*;
1476    /// # use futures::StreamExt;
1477    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1478    /// let tick = process.tick();
1479    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1480    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1481    /// batch.first().all_ticks()
1482    /// # }, |mut stream| async move {
1483    /// // 1
1484    /// # assert_eq!(stream.next().await.unwrap(), 1);
1485    /// # }));
1486    /// # }
1487    /// ```
1488    pub fn first(self) -> Optional<T, L, B> {
1489        self.reduce_idempotent(q!(|_, _| {}))
1490    }
1491
1492    /// Computes the last element in the stream as an [`Optional`], which
1493    /// will be empty until an element in the input arrives.
1494    ///
1495    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1496    /// re-ordering of elements may cause the last element to change.
1497    ///
1498    /// # Example
1499    /// ```rust
1500    /// # #[cfg(feature = "deploy")] {
1501    /// # use hydro_lang::prelude::*;
1502    /// # use futures::StreamExt;
1503    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1504    /// let tick = process.tick();
1505    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1506    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1507    /// batch.last().all_ticks()
1508    /// # }, |mut stream| async move {
1509    /// // 4
1510    /// # assert_eq!(stream.next().await.unwrap(), 4);
1511    /// # }));
1512    /// # }
1513    /// ```
1514    pub fn last(self) -> Optional<T, L, B> {
1515        self.reduce_idempotent(q!(|curr, new| *curr = new))
1516    }
1517}
1518
1519impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
1520where
1521    L: Location<'a>,
1522{
1523    /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1524    ///
1525    /// # Example
1526    /// ```rust
1527    /// # #[cfg(feature = "deploy")] {
1528    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1529    /// # use futures::StreamExt;
1530    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, TotalOrder, ExactlyOnce>(|process| {
1531    /// let tick = process.tick();
1532    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1533    /// numbers.enumerate()
1534    /// # }, |mut stream| async move {
1535    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1536    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1537    /// #     assert_eq!(stream.next().await.unwrap(), w);
1538    /// # }
1539    /// # }));
1540    /// # }
1541    /// ```
1542    pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder, ExactlyOnce> {
1543        Stream::new(
1544            self.location.clone(),
1545            HydroNode::Enumerate {
1546                input: Box::new(self.ir_node.into_inner()),
1547                metadata: self.location.new_node_metadata(Stream::<
1548                    (usize, T),
1549                    L,
1550                    B,
1551                    TotalOrder,
1552                    ExactlyOnce,
1553                >::collection_kind()),
1554            },
1555        )
1556    }
1557
1558    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1559    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1560    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1561    ///
1562    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1563    /// to depend on the order of elements in the stream.
1564    ///
1565    /// # Example
1566    /// ```rust
1567    /// # #[cfg(feature = "deploy")] {
1568    /// # use hydro_lang::prelude::*;
1569    /// # use futures::StreamExt;
1570    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1571    /// let tick = process.tick();
1572    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1573    /// let batch = words.batch(&tick, nondet!(/** test */));
1574    /// batch
1575    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1576    ///     .all_ticks()
1577    /// # }, |mut stream| async move {
1578    /// // "HELLOWORLD"
1579    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1580    /// # }));
1581    /// # }
1582    /// ```
1583    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
1584        self,
1585        init: impl IntoQuotedMut<'a, I, L>,
1586        comb: impl IntoQuotedMut<'a, F, L>,
1587    ) -> Singleton<A, L, B> {
1588        let init = init.splice_fn0_ctx(&self.location).into();
1589        let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1590
1591        let core = HydroNode::Fold {
1592            init,
1593            acc: comb,
1594            input: Box::new(self.ir_node.into_inner()),
1595            metadata: self
1596                .location
1597                .new_node_metadata(Singleton::<A, L, B>::collection_kind()),
1598        };
1599
1600        Singleton::new(self.location, core)
1601    }
1602
1603    /// Collects all the elements of this stream into a single [`Vec`] element.
1604    ///
1605    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1606    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1607    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1608    /// the vector at an arbitrary point in time.
1609    ///
1610    /// # Example
1611    /// ```rust
1612    /// # #[cfg(feature = "deploy")] {
1613    /// # use hydro_lang::prelude::*;
1614    /// # use futures::StreamExt;
1615    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1616    /// let tick = process.tick();
1617    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1618    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1619    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1620    /// # }, |mut stream| async move {
1621    /// // [ vec![1, 2, 3, 4] ]
1622    /// # for w in vec![vec![1, 2, 3, 4]] {
1623    /// #     assert_eq!(stream.next().await.unwrap(), w);
1624    /// # }
1625    /// # }));
1626    /// # }
1627    /// ```
1628    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B> {
1629        self.fold(
1630            q!(|| vec![]),
1631            q!(|acc, v| {
1632                acc.push(v);
1633            }),
1634        )
1635    }
1636
1637    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1638    /// and emitting each intermediate result.
1639    ///
1640    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1641    /// containing all intermediate accumulated values. The scan operation can also terminate early
1642    /// by returning `None`.
1643    ///
1644    /// The function takes a mutable reference to the accumulator and the current element, and returns
1645    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1646    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1647    ///
1648    /// # Examples
1649    ///
1650    /// Basic usage - running sum:
1651    /// ```rust
1652    /// # #[cfg(feature = "deploy")] {
1653    /// # use hydro_lang::prelude::*;
1654    /// # use futures::StreamExt;
1655    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1656    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1657    ///     q!(|| 0),
1658    ///     q!(|acc, x| {
1659    ///         *acc += x;
1660    ///         Some(*acc)
1661    ///     }),
1662    /// )
1663    /// # }, |mut stream| async move {
1664    /// // Output: 1, 3, 6, 10
1665    /// # for w in vec![1, 3, 6, 10] {
1666    /// #     assert_eq!(stream.next().await.unwrap(), w);
1667    /// # }
1668    /// # }));
1669    /// # }
1670    /// ```
1671    ///
1672    /// Early termination example:
1673    /// ```rust
1674    /// # #[cfg(feature = "deploy")] {
1675    /// # use hydro_lang::prelude::*;
1676    /// # use futures::StreamExt;
1677    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1678    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1679    ///     q!(|| 1),
1680    ///     q!(|state, x| {
1681    ///         *state = *state * x;
1682    ///         if *state > 6 {
1683    ///             None // Terminate the stream
1684    ///         } else {
1685    ///             Some(-*state)
1686    ///         }
1687    ///     }),
1688    /// )
1689    /// # }, |mut stream| async move {
1690    /// // Output: -1, -2, -6
1691    /// # for w in vec![-1, -2, -6] {
1692    /// #     assert_eq!(stream.next().await.unwrap(), w);
1693    /// # }
1694    /// # }));
1695    /// # }
1696    /// ```
1697    pub fn scan<A, U, I, F>(
1698        self,
1699        init: impl IntoQuotedMut<'a, I, L>,
1700        f: impl IntoQuotedMut<'a, F, L>,
1701    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1702    where
1703        I: Fn() -> A + 'a,
1704        F: Fn(&mut A, T) -> Option<U> + 'a,
1705    {
1706        let init = init.splice_fn0_ctx(&self.location).into();
1707        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1708
1709        Stream::new(
1710            self.location.clone(),
1711            HydroNode::Scan {
1712                init,
1713                acc: f,
1714                input: Box::new(self.ir_node.into_inner()),
1715                metadata: self.location.new_node_metadata(
1716                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1717                ),
1718            },
1719        )
1720    }
1721
1722    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1723    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1724    /// until the first element in the input arrives.
1725    ///
1726    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1727    /// to depend on the order of elements in the stream.
1728    ///
1729    /// # Example
1730    /// ```rust
1731    /// # #[cfg(feature = "deploy")] {
1732    /// # use hydro_lang::prelude::*;
1733    /// # use futures::StreamExt;
1734    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1735    /// let tick = process.tick();
1736    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1737    /// let batch = words.batch(&tick, nondet!(/** test */));
1738    /// batch
1739    ///     .map(q!(|x| x.to_string()))
1740    ///     .reduce(q!(|curr, new| curr.push_str(&new)))
1741    ///     .all_ticks()
1742    /// # }, |mut stream| async move {
1743    /// // "HELLOWORLD"
1744    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1745    /// # }));
1746    /// # }
1747    /// ```
1748    pub fn reduce<F: Fn(&mut T, T) + 'a>(
1749        self,
1750        comb: impl IntoQuotedMut<'a, F, L>,
1751    ) -> Optional<T, L, B> {
1752        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1753        let core = HydroNode::Reduce {
1754            f,
1755            input: Box::new(self.ir_node.into_inner()),
1756            metadata: self
1757                .location
1758                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1759        };
1760
1761        Optional::new(self.location, core)
1762    }
1763}
1764
1765impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
1766    /// Produces a new stream that interleaves the elements of the two input streams.
1767    /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1768    ///
1769    /// Currently, both input streams must be [`Unbounded`]. When the streams are
1770    /// [`Bounded`], you can use [`Stream::chain`] instead.
1771    ///
1772    /// # Example
1773    /// ```rust
1774    /// # #[cfg(feature = "deploy")] {
1775    /// # use hydro_lang::prelude::*;
1776    /// # use futures::StreamExt;
1777    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1778    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1779    /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1780    /// # }, |mut stream| async move {
1781    /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1782    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1783    /// #     assert_eq!(stream.next().await.unwrap(), w);
1784    /// # }
1785    /// # }));
1786    /// # }
1787    /// ```
1788    pub fn interleave<O2: Ordering, R2: Retries>(
1789        self,
1790        other: Stream<T, L, Unbounded, O2, R2>,
1791    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1792    where
1793        R: MinRetries<R2>,
1794    {
1795        Stream::new(
1796            self.location.clone(),
1797            HydroNode::Chain {
1798                first: Box::new(self.ir_node.into_inner()),
1799                second: Box::new(other.ir_node.into_inner()),
1800                metadata: self.location.new_node_metadata(Stream::<
1801                    T,
1802                    L,
1803                    Unbounded,
1804                    NoOrder,
1805                    <R as MinRetries<R2>>::Min,
1806                >::collection_kind()),
1807            },
1808        )
1809    }
1810}
1811
1812impl<'a, T, L, O: Ordering, R: Retries> Stream<T, L, Bounded, O, R>
1813where
1814    L: Location<'a>,
1815{
1816    /// Produces a new stream that emits the input elements in sorted order.
1817    ///
1818    /// The input stream can have any ordering guarantee, but the output stream
1819    /// will have a [`TotalOrder`] guarantee. This operator will block until all
1820    /// elements in the input stream are available, so it requires the input stream
1821    /// to be [`Bounded`].
1822    ///
1823    /// # Example
1824    /// ```rust
1825    /// # #[cfg(feature = "deploy")] {
1826    /// # use hydro_lang::prelude::*;
1827    /// # use futures::StreamExt;
1828    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1829    /// let tick = process.tick();
1830    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1831    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1832    /// batch.sort().all_ticks()
1833    /// # }, |mut stream| async move {
1834    /// // 1, 2, 3, 4
1835    /// # for w in (1..5) {
1836    /// #     assert_eq!(stream.next().await.unwrap(), w);
1837    /// # }
1838    /// # }));
1839    /// # }
1840    /// ```
1841    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1842    where
1843        T: Ord,
1844    {
1845        Stream::new(
1846            self.location.clone(),
1847            HydroNode::Sort {
1848                input: Box::new(self.ir_node.into_inner()),
1849                metadata: self
1850                    .location
1851                    .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
1852            },
1853        )
1854    }
1855
1856    /// Produces a new stream that first emits the elements of the `self` stream,
1857    /// and then emits the elements of the `other` stream. The output stream has
1858    /// a [`TotalOrder`] guarantee if and only if both input streams have a
1859    /// [`TotalOrder`] guarantee.
1860    ///
1861    /// Currently, both input streams must be [`Bounded`]. This operator will block
1862    /// on the first stream until all its elements are available. In a future version,
1863    /// we will relax the requirement on the `other` stream.
1864    ///
1865    /// # Example
1866    /// ```rust
1867    /// # #[cfg(feature = "deploy")] {
1868    /// # use hydro_lang::prelude::*;
1869    /// # use futures::StreamExt;
1870    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1871    /// let tick = process.tick();
1872    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1873    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1874    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1875    /// # }, |mut stream| async move {
1876    /// // 2, 3, 4, 5, 1, 2, 3, 4
1877    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1878    /// #     assert_eq!(stream.next().await.unwrap(), w);
1879    /// # }
1880    /// # }));
1881    /// # }
1882    /// ```
1883    pub fn chain<O2: Ordering, R2: Retries>(
1884        self,
1885        other: Stream<T, L, Bounded, O2, R2>,
1886    ) -> Stream<T, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1887    where
1888        O: MinOrder<O2>,
1889        R: MinRetries<R2>,
1890    {
1891        check_matching_location(&self.location, &other.location);
1892
1893        Stream::new(
1894            self.location.clone(),
1895            HydroNode::Chain {
1896                first: Box::new(self.ir_node.into_inner()),
1897                second: Box::new(other.ir_node.into_inner()),
1898                metadata: self.location.new_node_metadata(Stream::<
1899                    T,
1900                    L,
1901                    Bounded,
1902                    <O as MinOrder<O2>>::Min,
1903                    <R as MinRetries<R2>>::Min,
1904                >::collection_kind()),
1905            },
1906        )
1907    }
1908
1909    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
1910    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
1911    /// because this is compiled into a nested loop.
1912    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
1913        self,
1914        other: Stream<T2, L, Bounded, O2, R>,
1915    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
1916    where
1917        T: Clone,
1918        T2: Clone,
1919    {
1920        check_matching_location(&self.location, &other.location);
1921
1922        Stream::new(
1923            self.location.clone(),
1924            HydroNode::CrossProduct {
1925                left: Box::new(self.ir_node.into_inner()),
1926                right: Box::new(other.ir_node.into_inner()),
1927                metadata: self.location.new_node_metadata(Stream::<
1928                    (T, T2),
1929                    L,
1930                    Bounded,
1931                    <O2 as MinOrder<O>>::Min,
1932                    R,
1933                >::collection_kind()),
1934            },
1935        )
1936    }
1937
1938    /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
1939    /// `self` used as the values for *each* key.
1940    ///
1941    /// This is helpful when "broadcasting" a set of values so that all the keys have the same
1942    /// values. For example, it can be used to send the same set of elements to several cluster
1943    /// members, if the membership information is available as a [`KeyedSingleton`].
1944    ///
1945    /// # Example
1946    /// ```rust
1947    /// # #[cfg(feature = "deploy")] {
1948    /// # use hydro_lang::prelude::*;
1949    /// # use futures::StreamExt;
1950    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1951    /// # let tick = process.tick();
1952    /// let keyed_singleton = // { 1: (), 2: () }
1953    /// # process
1954    /// #     .source_iter(q!(vec![(1, ()), (2, ())]))
1955    /// #     .into_keyed()
1956    /// #     .batch(&tick, nondet!(/** test */))
1957    /// #     .first();
1958    /// let stream = // [ "a", "b" ]
1959    /// # process
1960    /// #     .source_iter(q!(vec!["a".to_string(), "b".to_string()]))
1961    /// #     .batch(&tick, nondet!(/** test */));
1962    /// stream.repeat_with_keys(keyed_singleton)
1963    /// # .entries().all_ticks()
1964    /// # }, |mut stream| async move {
1965    /// // { 1: ["a", "b" ], 2: ["a", "b"] }
1966    /// # let mut results = Vec::new();
1967    /// # for _ in 0..4 {
1968    /// #     results.push(stream.next().await.unwrap());
1969    /// # }
1970    /// # results.sort();
1971    /// # assert_eq!(results, vec![(1, "a".to_string()), (1, "b".to_string()), (2, "a".to_string()), (2, "b".to_string())]);
1972    /// # }));
1973    /// # }
1974    /// ```
1975    pub fn repeat_with_keys<K, V2>(
1976        self,
1977        keys: KeyedSingleton<K, V2, L, Bounded>,
1978    ) -> KeyedStream<K, T, L, Bounded, O, R>
1979    where
1980        K: Clone,
1981        T: Clone,
1982    {
1983        keys.keys()
1984            .weaken_retries()
1985            .assume_ordering_trusted::<TotalOrder>(
1986                nondet!(/** keyed stream does not depend on ordering of keys */),
1987            )
1988            .cross_product_nested_loop(self)
1989            .into_keyed()
1990    }
1991}
1992
1993impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
1994where
1995    L: Location<'a>,
1996{
1997    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
1998    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
1999    /// by equi-joining the two streams on the key attribute `K`.
2000    ///
2001    /// # Example
2002    /// ```rust
2003    /// # #[cfg(feature = "deploy")] {
2004    /// # use hydro_lang::prelude::*;
2005    /// # use std::collections::HashSet;
2006    /// # use futures::StreamExt;
2007    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2008    /// let tick = process.tick();
2009    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2010    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2011    /// stream1.join(stream2)
2012    /// # }, |mut stream| async move {
2013    /// // (1, ('a', 'x')), (2, ('b', 'y'))
2014    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2015    /// # stream.map(|i| assert!(expected.contains(&i)));
2016    /// # }));
2017    /// # }
2018    pub fn join<V2, O2: Ordering, R2: Retries>(
2019        self,
2020        n: Stream<(K, V2), L, B, O2, R2>,
2021    ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
2022    where
2023        K: Eq + Hash,
2024        R: MinRetries<R2>,
2025    {
2026        check_matching_location(&self.location, &n.location);
2027
2028        Stream::new(
2029            self.location.clone(),
2030            HydroNode::Join {
2031                left: Box::new(self.ir_node.into_inner()),
2032                right: Box::new(n.ir_node.into_inner()),
2033                metadata: self.location.new_node_metadata(Stream::<
2034                    (K, (V1, V2)),
2035                    L,
2036                    B,
2037                    NoOrder,
2038                    <R as MinRetries<R2>>::Min,
2039                >::collection_kind()),
2040            },
2041        )
2042    }
2043
2044    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2045    /// computes the anti-join of the items in the input -- i.e. returns
2046    /// unique items in the first input that do not have a matching key
2047    /// in the second input.
2048    ///
2049    /// # Example
2050    /// ```rust
2051    /// # #[cfg(feature = "deploy")] {
2052    /// # use hydro_lang::prelude::*;
2053    /// # use futures::StreamExt;
2054    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2055    /// let tick = process.tick();
2056    /// let stream = process
2057    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2058    ///   .batch(&tick, nondet!(/** test */));
2059    /// let batch = process
2060    ///   .source_iter(q!(vec![1, 2]))
2061    ///   .batch(&tick, nondet!(/** test */));
2062    /// stream.anti_join(batch).all_ticks()
2063    /// # }, |mut stream| async move {
2064    /// # for w in vec![(3, 'c'), (4, 'd')] {
2065    /// #     assert_eq!(stream.next().await.unwrap(), w);
2066    /// # }
2067    /// # }));
2068    /// # }
2069    pub fn anti_join<O2: Ordering, R2: Retries>(
2070        self,
2071        n: Stream<K, L, Bounded, O2, R2>,
2072    ) -> Stream<(K, V1), L, B, O, R>
2073    where
2074        K: Eq + Hash,
2075    {
2076        check_matching_location(&self.location, &n.location);
2077
2078        Stream::new(
2079            self.location.clone(),
2080            HydroNode::AntiJoin {
2081                pos: Box::new(self.ir_node.into_inner()),
2082                neg: Box::new(n.ir_node.into_inner()),
2083                metadata: self
2084                    .location
2085                    .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2086            },
2087        )
2088    }
2089}
2090
2091impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2092    Stream<(K, V), L, B, O, R>
2093{
2094    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2095    /// is used as the key and the second element is added to the entries associated with that key.
2096    ///
2097    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2098    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2099    /// performing grouped aggregations, but also for more precise ordering guarantees such as
2100    /// total ordering _within_ each group but no ordering _across_ groups.
2101    ///
2102    /// # Example
2103    /// ```rust
2104    /// # #[cfg(feature = "deploy")] {
2105    /// # use hydro_lang::prelude::*;
2106    /// # use futures::StreamExt;
2107    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2108    /// process
2109    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2110    ///     .into_keyed()
2111    /// #   .entries()
2112    /// # }, |mut stream| async move {
2113    /// // { 1: [2, 3], 2: [4] }
2114    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2115    /// #     assert_eq!(stream.next().await.unwrap(), w);
2116    /// # }
2117    /// # }));
2118    /// # }
2119    /// ```
2120    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2121        KeyedStream::new(
2122            self.location.clone(),
2123            HydroNode::Cast {
2124                inner: Box::new(self.ir_node.into_inner()),
2125                metadata: self
2126                    .location
2127                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2128            },
2129        )
2130    }
2131}
2132
2133impl<'a, K, V, L> Stream<(K, V), Tick<L>, Bounded, TotalOrder, ExactlyOnce>
2134where
2135    K: Eq + Hash,
2136    L: Location<'a>,
2137{
2138    #[deprecated = "use .into_keyed().fold(...) instead"]
2139    /// A special case of [`Stream::fold`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2140    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2141    /// in the second element are accumulated via the `comb` closure.
2142    ///
2143    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
2144    /// to depend on the order of elements in the stream.
2145    ///
2146    /// If the input and output value types are the same and do not require initialization then use
2147    /// [`Stream::reduce_keyed`].
2148    ///
2149    /// # Example
2150    /// ```rust
2151    /// # #[cfg(feature = "deploy")] {
2152    /// # use hydro_lang::prelude::*;
2153    /// # use futures::StreamExt;
2154    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2155    /// let tick = process.tick();
2156    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2157    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2158    /// batch
2159    ///     .fold_keyed(q!(|| 0), q!(|acc, x| *acc += x))
2160    ///     .all_ticks()
2161    /// # }, |mut stream| async move {
2162    /// // (1, 5), (2, 7)
2163    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2164    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2165    /// # }));
2166    /// # }
2167    /// ```
2168    pub fn fold_keyed<A, I, F>(
2169        self,
2170        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2171        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2172    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2173    where
2174        I: Fn() -> A + 'a,
2175        F: Fn(&mut A, V) + 'a,
2176    {
2177        self.into_keyed().fold(init, comb).entries()
2178    }
2179
2180    #[deprecated = "use .into_keyed().reduce(...) instead"]
2181    /// A special case of [`Stream::reduce`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2182    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2183    /// in the second element are accumulated via the `comb` closure.
2184    ///
2185    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
2186    /// to depend on the order of elements in the stream.
2187    ///
2188    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed`].
2189    ///
2190    /// # Example
2191    /// ```rust
2192    /// # #[cfg(feature = "deploy")] {
2193    /// # use hydro_lang::prelude::*;
2194    /// # use futures::StreamExt;
2195    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2196    /// let tick = process.tick();
2197    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2198    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2199    /// batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks()
2200    /// # }, |mut stream| async move {
2201    /// // (1, 5), (2, 7)
2202    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2203    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2204    /// # }));
2205    /// # }
2206    /// ```
2207    pub fn reduce_keyed<F>(
2208        self,
2209        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2210    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2211    where
2212        F: Fn(&mut V, V) + 'a,
2213    {
2214        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
2215
2216        Stream::new(
2217            self.location.clone(),
2218            HydroNode::ReduceKeyed {
2219                f,
2220                input: Box::new(self.ir_node.into_inner()),
2221                metadata: self.location.new_node_metadata(Stream::<
2222                    (K, V),
2223                    Tick<L>,
2224                    Bounded,
2225                    NoOrder,
2226                    ExactlyOnce,
2227                >::collection_kind()),
2228            },
2229        )
2230    }
2231}
2232
2233impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2234where
2235    K: Eq + Hash,
2236    L: Location<'a>,
2237{
2238    #[deprecated = "use .into_keyed().fold_commutative_idempotent(...) instead"]
2239    /// A special case of [`Stream::fold_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2240    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2241    /// in the second element are accumulated via the `comb` closure.
2242    ///
2243    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2244    /// as there may be non-deterministic duplicates.
2245    ///
2246    /// If the input and output value types are the same and do not require initialization then use
2247    /// [`Stream::reduce_keyed_commutative_idempotent`].
2248    ///
2249    /// # Example
2250    /// ```rust
2251    /// # #[cfg(feature = "deploy")] {
2252    /// # use hydro_lang::prelude::*;
2253    /// # use futures::StreamExt;
2254    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2255    /// let tick = process.tick();
2256    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2257    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2258    /// batch
2259    ///     .fold_keyed_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2260    ///     .all_ticks()
2261    /// # }, |mut stream| async move {
2262    /// // (1, false), (2, true)
2263    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2264    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2265    /// # }));
2266    /// # }
2267    /// ```
2268    pub fn fold_keyed_commutative_idempotent<A, I, F>(
2269        self,
2270        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2271        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2272    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2273    where
2274        I: Fn() -> A + 'a,
2275        F: Fn(&mut A, V) + 'a,
2276    {
2277        self.into_keyed()
2278            .fold_commutative_idempotent(init, comb)
2279            .entries()
2280    }
2281
2282    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2283    /// # Example
2284    /// ```rust
2285    /// # #[cfg(feature = "deploy")] {
2286    /// # use hydro_lang::prelude::*;
2287    /// # use futures::StreamExt;
2288    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2289    /// let tick = process.tick();
2290    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2291    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2292    /// batch.keys().all_ticks()
2293    /// # }, |mut stream| async move {
2294    /// // 1, 2
2295    /// # assert_eq!(stream.next().await.unwrap(), 1);
2296    /// # assert_eq!(stream.next().await.unwrap(), 2);
2297    /// # }));
2298    /// # }
2299    /// ```
2300    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2301        self.into_keyed()
2302            .fold_commutative_idempotent(q!(|| ()), q!(|_, _| {}))
2303            .keys()
2304    }
2305
2306    #[deprecated = "use .into_keyed().reduce_commutative_idempotent(...) instead"]
2307    /// A special case of [`Stream::reduce_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2308    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2309    /// in the second element are accumulated via the `comb` closure.
2310    ///
2311    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2312    /// as there may be non-deterministic duplicates.
2313    ///
2314    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative_idempotent`].
2315    ///
2316    /// # Example
2317    /// ```rust
2318    /// # #[cfg(feature = "deploy")] {
2319    /// # use hydro_lang::prelude::*;
2320    /// # use futures::StreamExt;
2321    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2322    /// let tick = process.tick();
2323    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2324    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2325    /// batch
2326    ///     .reduce_keyed_commutative_idempotent(q!(|acc, x| *acc |= x))
2327    ///     .all_ticks()
2328    /// # }, |mut stream| async move {
2329    /// // (1, false), (2, true)
2330    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2331    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2332    /// # }));
2333    /// # }
2334    /// ```
2335    pub fn reduce_keyed_commutative_idempotent<F>(
2336        self,
2337        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2338    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2339    where
2340        F: Fn(&mut V, V) + 'a,
2341    {
2342        self.into_keyed()
2343            .reduce_commutative_idempotent(comb)
2344            .entries()
2345    }
2346}
2347
2348impl<'a, K, V, L, O: Ordering> Stream<(K, V), Tick<L>, Bounded, O, ExactlyOnce>
2349where
2350    K: Eq + Hash,
2351    L: Location<'a>,
2352{
2353    #[deprecated = "use .into_keyed().fold_commutative(...) instead"]
2354    /// A special case of [`Stream::fold_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2355    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2356    /// in the second element are accumulated via the `comb` closure.
2357    ///
2358    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2359    ///
2360    /// If the input and output value types are the same and do not require initialization then use
2361    /// [`Stream::reduce_keyed_commutative`].
2362    ///
2363    /// # Example
2364    /// ```rust
2365    /// # #[cfg(feature = "deploy")] {
2366    /// # use hydro_lang::prelude::*;
2367    /// # use futures::StreamExt;
2368    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2369    /// let tick = process.tick();
2370    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2371    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2372    /// batch
2373    ///     .fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x))
2374    ///     .all_ticks()
2375    /// # }, |mut stream| async move {
2376    /// // (1, 5), (2, 7)
2377    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2378    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2379    /// # }));
2380    /// # }
2381    /// ```
2382    pub fn fold_keyed_commutative<A, I, F>(
2383        self,
2384        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2385        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2386    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2387    where
2388        I: Fn() -> A + 'a,
2389        F: Fn(&mut A, V) + 'a,
2390    {
2391        self.into_keyed().fold_commutative(init, comb).entries()
2392    }
2393
2394    #[deprecated = "use .into_keyed().reduce_commutative(...) instead"]
2395    /// A special case of [`Stream::reduce_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2396    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2397    /// in the second element are accumulated via the `comb` closure.
2398    ///
2399    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2400    ///
2401    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative`].
2402    ///
2403    /// # Example
2404    /// ```rust
2405    /// # #[cfg(feature = "deploy")] {
2406    /// # use hydro_lang::prelude::*;
2407    /// # use futures::StreamExt;
2408    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2409    /// let tick = process.tick();
2410    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2411    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2412    /// batch
2413    ///     .reduce_keyed_commutative(q!(|acc, x| *acc += x))
2414    ///     .all_ticks()
2415    /// # }, |mut stream| async move {
2416    /// // (1, 5), (2, 7)
2417    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2418    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2419    /// # }));
2420    /// # }
2421    /// ```
2422    pub fn reduce_keyed_commutative<F>(
2423        self,
2424        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2425    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2426    where
2427        F: Fn(&mut V, V) + 'a,
2428    {
2429        self.into_keyed().reduce_commutative(comb).entries()
2430    }
2431}
2432
2433impl<'a, K, V, L, R: Retries> Stream<(K, V), Tick<L>, Bounded, TotalOrder, R>
2434where
2435    K: Eq + Hash,
2436    L: Location<'a>,
2437{
2438    #[deprecated = "use .into_keyed().fold_idempotent(...) instead"]
2439    /// A special case of [`Stream::fold_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2440    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2441    /// in the second element are accumulated via the `comb` closure.
2442    ///
2443    /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
2444    ///
2445    /// If the input and output value types are the same and do not require initialization then use
2446    /// [`Stream::reduce_keyed_idempotent`].
2447    ///
2448    /// # Example
2449    /// ```rust
2450    /// # #[cfg(feature = "deploy")] {
2451    /// # use hydro_lang::prelude::*;
2452    /// # use futures::StreamExt;
2453    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2454    /// let tick = process.tick();
2455    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2456    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2457    /// batch
2458    ///     .fold_keyed_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2459    ///     .all_ticks()
2460    /// # }, |mut stream| async move {
2461    /// // (1, false), (2, true)
2462    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2463    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2464    /// # }));
2465    /// # }
2466    /// ```
2467    pub fn fold_keyed_idempotent<A, I, F>(
2468        self,
2469        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2470        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2471    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2472    where
2473        I: Fn() -> A + 'a,
2474        F: Fn(&mut A, V) + 'a,
2475    {
2476        self.into_keyed().fold_idempotent(init, comb).entries()
2477    }
2478
2479    #[deprecated = "use .into_keyed().reduce_idempotent(...) instead"]
2480    /// A special case of [`Stream::reduce_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2481    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2482    /// in the second element are accumulated via the `comb` closure.
2483    ///
2484    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
2485    ///
2486    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_idempotent`].
2487    ///
2488    /// # Example
2489    /// ```rust
2490    /// # #[cfg(feature = "deploy")] {
2491    /// # use hydro_lang::prelude::*;
2492    /// # use futures::StreamExt;
2493    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2494    /// let tick = process.tick();
2495    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2496    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2497    /// batch
2498    ///     .reduce_keyed_idempotent(q!(|acc, x| *acc |= x))
2499    ///     .all_ticks()
2500    /// # }, |mut stream| async move {
2501    /// // (1, false), (2, true)
2502    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2503    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2504    /// # }));
2505    /// # }
2506    /// ```
2507    pub fn reduce_keyed_idempotent<F>(
2508        self,
2509        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2510    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2511    where
2512        F: Fn(&mut V, V) + 'a,
2513    {
2514        self.into_keyed().reduce_idempotent(comb).entries()
2515    }
2516}
2517
2518impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2519where
2520    L: Location<'a> + NoTick,
2521{
2522    /// Returns a stream corresponding to the latest batch of elements being atomically
2523    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2524    /// the order of the input.
2525    ///
2526    /// # Non-Determinism
2527    /// The batch boundaries are non-deterministic and may change across executions.
2528    pub fn batch_atomic(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2529        Stream::new(
2530            self.location.clone().tick,
2531            HydroNode::Batch {
2532                inner: Box::new(self.ir_node.into_inner()),
2533                metadata: self
2534                    .location
2535                    .tick
2536                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2537            },
2538        )
2539    }
2540
2541    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2542    /// See [`Stream::atomic`] for more details.
2543    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2544        Stream::new(
2545            self.location.tick.l.clone(),
2546            HydroNode::EndAtomic {
2547                inner: Box::new(self.ir_node.into_inner()),
2548                metadata: self
2549                    .location
2550                    .tick
2551                    .l
2552                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2553            },
2554        )
2555    }
2556}
2557
2558impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2559where
2560    L: Location<'a>,
2561{
2562    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
2563    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
2564    ///
2565    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2566    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2567    /// argument that declares where the stream will be atomically processed. Batching a stream into
2568    /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2569    /// [`Tick`] will introduce asynchrony.
2570    pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
2571        let out_location = Atomic { tick: tick.clone() };
2572        Stream::new(
2573            out_location.clone(),
2574            HydroNode::BeginAtomic {
2575                inner: Box::new(self.ir_node.into_inner()),
2576                metadata: out_location
2577                    .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
2578            },
2579        )
2580    }
2581
2582    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2583    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2584    /// the order of the input. The output stream will execute in the [`Tick`] that was
2585    /// used to create the atomic section.
2586    ///
2587    /// # Non-Determinism
2588    /// The batch boundaries are non-deterministic and may change across executions.
2589    pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2590        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2591        Stream::new(
2592            tick.clone(),
2593            HydroNode::Batch {
2594                inner: Box::new(self.ir_node.into_inner()),
2595                metadata: tick
2596                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2597            },
2598        )
2599    }
2600
2601    /// Given a time interval, returns a stream corresponding to samples taken from the
2602    /// stream roughly at that interval. The output will have elements in the same order
2603    /// as the input, but with arbitrary elements skipped between samples. There is also
2604    /// no guarantee on the exact timing of the samples.
2605    ///
2606    /// # Non-Determinism
2607    /// The output stream is non-deterministic in which elements are sampled, since this
2608    /// is controlled by a clock.
2609    pub fn sample_every(
2610        self,
2611        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
2612        nondet: NonDet,
2613    ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
2614    where
2615        L: NoTick + NoAtomic,
2616    {
2617        let samples = self.location.source_interval(interval, nondet);
2618
2619        let tick = self.location.tick();
2620        self.batch(&tick, nondet)
2621            .filter_if_some(samples.batch(&tick, nondet).first())
2622            .all_ticks()
2623            .weakest_retries()
2624    }
2625
2626    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
2627    /// stream has not emitted a value since that duration.
2628    ///
2629    /// # Non-Determinism
2630    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
2631    /// samples take place, timeouts may be non-deterministically generated or missed,
2632    /// and the notification of the timeout may be delayed as well. There is also no
2633    /// guarantee on how long the [`Optional`] will have a value after the timeout is
2634    /// detected based on when the next sample is taken.
2635    pub fn timeout(
2636        self,
2637        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
2638        nondet: NonDet,
2639    ) -> Optional<(), L, Unbounded>
2640    where
2641        L: NoTick + NoAtomic,
2642    {
2643        let tick = self.location.tick();
2644
2645        let latest_received = self.assume_retries(nondet).fold_commutative(
2646            q!(|| None),
2647            q!(|latest, _| {
2648                *latest = Some(Instant::now());
2649            }),
2650        );
2651
2652        latest_received
2653            .snapshot(&tick, nondet)
2654            .filter_map(q!(move |latest_received| {
2655                if let Some(latest_received) = latest_received {
2656                    if Instant::now().duration_since(latest_received) > duration {
2657                        Some(())
2658                    } else {
2659                        None
2660                    }
2661                } else {
2662                    Some(())
2663                }
2664            }))
2665            .latest()
2666    }
2667}
2668
2669impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2670where
2671    L: Location<'a> + NoTick + NoAtomic,
2672    F: Future<Output = T>,
2673{
2674    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2675    /// Future outputs are produced as available, regardless of input arrival order.
2676    ///
2677    /// # Example
2678    /// ```rust
2679    /// # #[cfg(feature = "deploy")] {
2680    /// # use std::collections::HashSet;
2681    /// # use futures::StreamExt;
2682    /// # use hydro_lang::prelude::*;
2683    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2684    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2685    ///     .map(q!(|x| async move {
2686    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2687    ///         x
2688    ///     }))
2689    ///     .resolve_futures()
2690    /// #   },
2691    /// #   |mut stream| async move {
2692    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2693    /// #       let mut output = HashSet::new();
2694    /// #       for _ in 1..10 {
2695    /// #           output.insert(stream.next().await.unwrap());
2696    /// #       }
2697    /// #       assert_eq!(
2698    /// #           output,
2699    /// #           HashSet::<i32>::from_iter(1..10)
2700    /// #       );
2701    /// #   },
2702    /// # ));
2703    /// # }
2704    pub fn resolve_futures(self) -> Stream<T, L, B, NoOrder, R> {
2705        Stream::new(
2706            self.location.clone(),
2707            HydroNode::ResolveFutures {
2708                input: Box::new(self.ir_node.into_inner()),
2709                metadata: self
2710                    .location
2711                    .new_node_metadata(Stream::<T, L, B, NoOrder, R>::collection_kind()),
2712            },
2713        )
2714    }
2715
2716    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2717    /// Future outputs are produced in the same order as the input stream.
2718    ///
2719    /// # Example
2720    /// ```rust
2721    /// # #[cfg(feature = "deploy")] {
2722    /// # use std::collections::HashSet;
2723    /// # use futures::StreamExt;
2724    /// # use hydro_lang::prelude::*;
2725    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2726    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2727    ///     .map(q!(|x| async move {
2728    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2729    ///         x
2730    ///     }))
2731    ///     .resolve_futures_ordered()
2732    /// #   },
2733    /// #   |mut stream| async move {
2734    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2735    /// #       let mut output = Vec::new();
2736    /// #       for _ in 1..10 {
2737    /// #           output.push(stream.next().await.unwrap());
2738    /// #       }
2739    /// #       assert_eq!(
2740    /// #           output,
2741    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2742    /// #       );
2743    /// #   },
2744    /// # ));
2745    /// # }
2746    pub fn resolve_futures_ordered(self) -> Stream<T, L, B, O, R> {
2747        Stream::new(
2748            self.location.clone(),
2749            HydroNode::ResolveFuturesOrdered {
2750                input: Box::new(self.ir_node.into_inner()),
2751                metadata: self
2752                    .location
2753                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2754            },
2755        )
2756    }
2757}
2758
2759impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
2760where
2761    L: Location<'a> + NoTick,
2762{
2763    /// Executes the provided closure for every element in this stream.
2764    ///
2765    /// Because the closure may have side effects, the stream must have deterministic order
2766    /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
2767    /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
2768    /// [`Stream::assume_retries`] with an explanation for why this is the case.
2769    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
2770        let f = f.splice_fn1_ctx(&self.location).into();
2771        self.location
2772            .flow_state()
2773            .borrow_mut()
2774            .push_root(HydroRoot::ForEach {
2775                input: Box::new(self.ir_node.into_inner()),
2776                f,
2777                op_metadata: HydroIrOpMetadata::new(),
2778            });
2779    }
2780
2781    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
2782    /// TCP socket to some other server. You should _not_ use this API for interacting with
2783    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
2784    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
2785    /// interaction with asynchronous sinks.
2786    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
2787    where
2788        S: 'a + futures::Sink<T> + Unpin,
2789    {
2790        self.location
2791            .flow_state()
2792            .borrow_mut()
2793            .push_root(HydroRoot::DestSink {
2794                sink: sink.splice_typed_ctx(&self.location).into(),
2795                input: Box::new(self.ir_node.into_inner()),
2796                op_metadata: HydroIrOpMetadata::new(),
2797            });
2798    }
2799}
2800
2801impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2802where
2803    L: Location<'a>,
2804{
2805    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2806    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2807    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2808        Stream::new(
2809            self.location.outer().clone(),
2810            HydroNode::YieldConcat {
2811                inner: Box::new(self.ir_node.into_inner()),
2812                metadata: self
2813                    .location
2814                    .outer()
2815                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2816            },
2817        )
2818    }
2819
2820    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2821    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2822    ///
2823    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2824    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2825    /// stream's [`Tick`] context.
2826    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2827        let out_location = Atomic {
2828            tick: self.location.clone(),
2829        };
2830
2831        Stream::new(
2832            out_location.clone(),
2833            HydroNode::YieldConcat {
2834                inner: Box::new(self.ir_node.into_inner()),
2835                metadata: out_location
2836                    .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2837            },
2838        )
2839    }
2840
2841    /// Accumulates the elements of this stream **across ticks** by concatenating them together.
2842    ///
2843    /// The output stream in tick T will contain the elements of the input at tick 0, 1, ..., up to
2844    /// and including tick T. This is useful for accumulating streaming inputs across ticks, but be
2845    /// careful when using this operator, as its memory usage will grow linearly over time since it
2846    /// must store its inputs indefinitely.
2847    ///
2848    /// # Example
2849    /// ```rust
2850    /// # #[cfg(feature = "deploy")] {
2851    /// # use hydro_lang::prelude::*;
2852    /// # use futures::StreamExt;
2853    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2854    /// let tick = process.tick();
2855    /// // ticks are lazy by default, forces the second tick to run
2856    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2857    ///
2858    /// let batch_first_tick = process
2859    ///   .source_iter(q!(vec![1, 2, 3, 4]))
2860    ///   .batch(&tick, nondet!(/** test */));
2861    /// let batch_second_tick = process
2862    ///   .source_iter(q!(vec![5, 6, 7, 8]))
2863    ///   .batch(&tick, nondet!(/** test */))
2864    ///   .defer_tick(); // appears on the second tick
2865    /// batch_first_tick.chain(batch_second_tick)
2866    ///   .persist()
2867    ///   .all_ticks()
2868    /// # }, |mut stream| async move {
2869    /// // [1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, ...]
2870    /// # for w in vec![1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8] {
2871    /// #     assert_eq!(stream.next().await.unwrap(), w);
2872    /// # }
2873    /// # }));
2874    /// # }
2875    /// ```
2876    pub fn persist(self) -> Stream<T, Tick<L>, Bounded, O, R>
2877    where
2878        T: Clone,
2879    {
2880        Stream::new(
2881            self.location.clone(),
2882            HydroNode::Persist {
2883                inner: Box::new(self.ir_node.into_inner()),
2884                metadata: self
2885                    .location
2886                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2887            },
2888        )
2889    }
2890
2891    /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2892    /// always has the elements of `self` at tick `T - 1`.
2893    ///
2894    /// At tick `0`, the output stream is empty, since there is no previous tick.
2895    ///
2896    /// This operator enables stateful iterative processing with ticks, by sending data from one
2897    /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2898    ///
2899    /// # Example
2900    /// ```rust
2901    /// # #[cfg(feature = "deploy")] {
2902    /// # use hydro_lang::prelude::*;
2903    /// # use futures::StreamExt;
2904    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2905    /// let tick = process.tick();
2906    /// // ticks are lazy by default, forces the second tick to run
2907    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2908    ///
2909    /// let batch_first_tick = process
2910    ///   .source_iter(q!(vec![1, 2, 3, 4]))
2911    ///   .batch(&tick, nondet!(/** test */));
2912    /// let batch_second_tick = process
2913    ///   .source_iter(q!(vec![0, 3, 4, 5, 6]))
2914    ///   .batch(&tick, nondet!(/** test */))
2915    ///   .defer_tick(); // appears on the second tick
2916    /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2917    ///
2918    /// changes_across_ticks.clone().filter_not_in(
2919    ///     changes_across_ticks.defer_tick() // the elements from the previous tick
2920    /// ).all_ticks()
2921    /// # }, |mut stream| async move {
2922    /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2923    /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2924    /// #     assert_eq!(stream.next().await.unwrap(), w);
2925    /// # }
2926    /// # }));
2927    /// # }
2928    /// ```
2929    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2930        Stream::new(
2931            self.location.clone(),
2932            HydroNode::DeferTick {
2933                input: Box::new(self.ir_node.into_inner()),
2934                metadata: self
2935                    .location
2936                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2937            },
2938        )
2939    }
2940}
2941
2942#[cfg(test)]
2943mod tests {
2944    #[cfg(feature = "deploy")]
2945    use futures::{SinkExt, StreamExt};
2946    #[cfg(feature = "deploy")]
2947    use hydro_deploy::Deployment;
2948    #[cfg(feature = "deploy")]
2949    use serde::{Deserialize, Serialize};
2950    #[cfg(feature = "deploy")]
2951    use stageleft::q;
2952
2953    #[cfg(any(feature = "deploy", feature = "sim"))]
2954    use crate::compile::builder::FlowBuilder;
2955    #[cfg(feature = "deploy")]
2956    use crate::live_collections::stream::ExactlyOnce;
2957    #[cfg(feature = "sim")]
2958    use crate::live_collections::stream::NoOrder;
2959    #[cfg(any(feature = "deploy", feature = "sim"))]
2960    use crate::live_collections::stream::TotalOrder;
2961    #[cfg(any(feature = "deploy", feature = "sim"))]
2962    use crate::location::Location;
2963    #[cfg(any(feature = "deploy", feature = "sim"))]
2964    use crate::nondet::nondet;
2965
2966    mod backtrace_chained_ops;
2967
2968    #[cfg(feature = "deploy")]
2969    struct P1 {}
2970    #[cfg(feature = "deploy")]
2971    struct P2 {}
2972
2973    #[cfg(feature = "deploy")]
2974    #[derive(Serialize, Deserialize, Debug)]
2975    struct SendOverNetwork {
2976        n: u32,
2977    }
2978
2979    #[cfg(feature = "deploy")]
2980    #[tokio::test]
2981    async fn first_ten_distributed() {
2982        let mut deployment = Deployment::new();
2983
2984        let flow = FlowBuilder::new();
2985        let first_node = flow.process::<P1>();
2986        let second_node = flow.process::<P2>();
2987        let external = flow.external::<P2>();
2988
2989        let numbers = first_node.source_iter(q!(0..10));
2990        let out_port = numbers
2991            .map(q!(|n| SendOverNetwork { n }))
2992            .send_bincode(&second_node)
2993            .send_bincode_external(&external);
2994
2995        let nodes = flow
2996            .with_process(&first_node, deployment.Localhost())
2997            .with_process(&second_node, deployment.Localhost())
2998            .with_external(&external, deployment.Localhost())
2999            .deploy(&mut deployment);
3000
3001        deployment.deploy().await.unwrap();
3002
3003        let mut external_out = nodes.connect(out_port).await;
3004
3005        deployment.start().await.unwrap();
3006
3007        for i in 0..10 {
3008            assert_eq!(external_out.next().await.unwrap().n, i);
3009        }
3010    }
3011
3012    #[cfg(feature = "deploy")]
3013    #[tokio::test]
3014    async fn first_cardinality() {
3015        let mut deployment = Deployment::new();
3016
3017        let flow = FlowBuilder::new();
3018        let node = flow.process::<()>();
3019        let external = flow.external::<()>();
3020
3021        let node_tick = node.tick();
3022        let count = node_tick
3023            .singleton(q!([1, 2, 3]))
3024            .into_stream()
3025            .flatten_ordered()
3026            .first()
3027            .into_stream()
3028            .count()
3029            .all_ticks()
3030            .send_bincode_external(&external);
3031
3032        let nodes = flow
3033            .with_process(&node, deployment.Localhost())
3034            .with_external(&external, deployment.Localhost())
3035            .deploy(&mut deployment);
3036
3037        deployment.deploy().await.unwrap();
3038
3039        let mut external_out = nodes.connect(count).await;
3040
3041        deployment.start().await.unwrap();
3042
3043        assert_eq!(external_out.next().await.unwrap(), 1);
3044    }
3045
3046    #[cfg(feature = "deploy")]
3047    #[tokio::test]
3048    async fn unbounded_reduce_remembers_state() {
3049        let mut deployment = Deployment::new();
3050
3051        let flow = FlowBuilder::new();
3052        let node = flow.process::<()>();
3053        let external = flow.external::<()>();
3054
3055        let (input_port, input) = node.source_external_bincode(&external);
3056        let out = input
3057            .reduce(q!(|acc, v| *acc += v))
3058            .sample_eager(nondet!(/** test */))
3059            .send_bincode_external(&external);
3060
3061        let nodes = flow
3062            .with_process(&node, deployment.Localhost())
3063            .with_external(&external, deployment.Localhost())
3064            .deploy(&mut deployment);
3065
3066        deployment.deploy().await.unwrap();
3067
3068        let mut external_in = nodes.connect(input_port).await;
3069        let mut external_out = nodes.connect(out).await;
3070
3071        deployment.start().await.unwrap();
3072
3073        external_in.send(1).await.unwrap();
3074        assert_eq!(external_out.next().await.unwrap(), 1);
3075
3076        external_in.send(2).await.unwrap();
3077        assert_eq!(external_out.next().await.unwrap(), 3);
3078    }
3079
3080    #[cfg(feature = "deploy")]
3081    #[tokio::test]
3082    async fn atomic_fold_replays_each_tick() {
3083        let mut deployment = Deployment::new();
3084
3085        let flow = FlowBuilder::new();
3086        let node = flow.process::<()>();
3087        let external = flow.external::<()>();
3088
3089        let (input_port, input) =
3090            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3091        let tick = node.tick();
3092
3093        let out = input
3094            .batch(&tick, nondet!(/** test */))
3095            .cross_singleton(
3096                node.source_iter(q!(vec![1, 2, 3]))
3097                    .atomic(&tick)
3098                    .fold(q!(|| 0), q!(|acc, v| *acc += v))
3099                    .snapshot_atomic(nondet!(/** test */)),
3100            )
3101            .all_ticks()
3102            .send_bincode_external(&external);
3103
3104        let nodes = flow
3105            .with_process(&node, deployment.Localhost())
3106            .with_external(&external, deployment.Localhost())
3107            .deploy(&mut deployment);
3108
3109        deployment.deploy().await.unwrap();
3110
3111        let mut external_in = nodes.connect(input_port).await;
3112        let mut external_out = nodes.connect(out).await;
3113
3114        deployment.start().await.unwrap();
3115
3116        external_in.send(1).await.unwrap();
3117        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3118
3119        external_in.send(2).await.unwrap();
3120        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3121    }
3122
3123    #[cfg(feature = "deploy")]
3124    #[tokio::test]
3125    async fn unbounded_scan_remembers_state() {
3126        let mut deployment = Deployment::new();
3127
3128        let flow = FlowBuilder::new();
3129        let node = flow.process::<()>();
3130        let external = flow.external::<()>();
3131
3132        let (input_port, input) = node.source_external_bincode(&external);
3133        let out = input
3134            .scan(
3135                q!(|| 0),
3136                q!(|acc, v| {
3137                    *acc += v;
3138                    Some(*acc)
3139                }),
3140            )
3141            .send_bincode_external(&external);
3142
3143        let nodes = flow
3144            .with_process(&node, deployment.Localhost())
3145            .with_external(&external, deployment.Localhost())
3146            .deploy(&mut deployment);
3147
3148        deployment.deploy().await.unwrap();
3149
3150        let mut external_in = nodes.connect(input_port).await;
3151        let mut external_out = nodes.connect(out).await;
3152
3153        deployment.start().await.unwrap();
3154
3155        external_in.send(1).await.unwrap();
3156        assert_eq!(external_out.next().await.unwrap(), 1);
3157
3158        external_in.send(2).await.unwrap();
3159        assert_eq!(external_out.next().await.unwrap(), 3);
3160    }
3161
3162    #[cfg(feature = "deploy")]
3163    #[tokio::test]
3164    async fn unbounded_enumerate_remembers_state() {
3165        let mut deployment = Deployment::new();
3166
3167        let flow = FlowBuilder::new();
3168        let node = flow.process::<()>();
3169        let external = flow.external::<()>();
3170
3171        let (input_port, input) = node.source_external_bincode(&external);
3172        let out = input.enumerate().send_bincode_external(&external);
3173
3174        let nodes = flow
3175            .with_process(&node, deployment.Localhost())
3176            .with_external(&external, deployment.Localhost())
3177            .deploy(&mut deployment);
3178
3179        deployment.deploy().await.unwrap();
3180
3181        let mut external_in = nodes.connect(input_port).await;
3182        let mut external_out = nodes.connect(out).await;
3183
3184        deployment.start().await.unwrap();
3185
3186        external_in.send(1).await.unwrap();
3187        assert_eq!(external_out.next().await.unwrap(), (0, 1));
3188
3189        external_in.send(2).await.unwrap();
3190        assert_eq!(external_out.next().await.unwrap(), (1, 2));
3191    }
3192
3193    #[cfg(feature = "deploy")]
3194    #[tokio::test]
3195    async fn unbounded_unique_remembers_state() {
3196        let mut deployment = Deployment::new();
3197
3198        let flow = FlowBuilder::new();
3199        let node = flow.process::<()>();
3200        let external = flow.external::<()>();
3201
3202        let (input_port, input) =
3203            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3204        let out = input.unique().send_bincode_external(&external);
3205
3206        let nodes = flow
3207            .with_process(&node, deployment.Localhost())
3208            .with_external(&external, deployment.Localhost())
3209            .deploy(&mut deployment);
3210
3211        deployment.deploy().await.unwrap();
3212
3213        let mut external_in = nodes.connect(input_port).await;
3214        let mut external_out = nodes.connect(out).await;
3215
3216        deployment.start().await.unwrap();
3217
3218        external_in.send(1).await.unwrap();
3219        assert_eq!(external_out.next().await.unwrap(), 1);
3220
3221        external_in.send(2).await.unwrap();
3222        assert_eq!(external_out.next().await.unwrap(), 2);
3223
3224        external_in.send(1).await.unwrap();
3225        external_in.send(3).await.unwrap();
3226        assert_eq!(external_out.next().await.unwrap(), 3);
3227    }
3228
3229    #[cfg(feature = "sim")]
3230    #[test]
3231    #[should_panic]
3232    fn sim_batch_nondet_size() {
3233        let flow = FlowBuilder::new();
3234        let node = flow.process::<()>();
3235
3236        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3237
3238        let tick = node.tick();
3239        let out_recv = input
3240            .batch(&tick, nondet!(/** test */))
3241            .count()
3242            .all_ticks()
3243            .sim_output();
3244
3245        flow.sim().exhaustive(async || {
3246            in_send.send(());
3247            in_send.send(());
3248            in_send.send(());
3249
3250            assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3251        });
3252    }
3253
3254    #[cfg(feature = "sim")]
3255    #[test]
3256    fn sim_batch_preserves_order() {
3257        let flow = FlowBuilder::new();
3258        let node = flow.process::<()>();
3259
3260        let (in_send, input) = node.sim_input();
3261
3262        let tick = node.tick();
3263        let out_recv = input
3264            .batch(&tick, nondet!(/** test */))
3265            .all_ticks()
3266            .sim_output();
3267
3268        flow.sim().exhaustive(async || {
3269            in_send.send(1);
3270            in_send.send(2);
3271            in_send.send(3);
3272
3273            out_recv.assert_yields_only([1, 2, 3]).await;
3274        });
3275    }
3276
3277    #[cfg(feature = "sim")]
3278    #[test]
3279    #[should_panic]
3280    fn sim_batch_unordered_shuffles() {
3281        let flow = FlowBuilder::new();
3282        let node = flow.process::<()>();
3283
3284        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3285
3286        let tick = node.tick();
3287        let batch = input.batch(&tick, nondet!(/** test */));
3288        let out_recv = batch
3289            .clone()
3290            .min()
3291            .zip(batch.max())
3292            .all_ticks()
3293            .sim_output();
3294
3295        flow.sim().exhaustive(async || {
3296            in_send.send_many_unordered([1, 2, 3]);
3297
3298            if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3299                panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3300            }
3301        });
3302    }
3303
3304    #[cfg(feature = "sim")]
3305    #[test]
3306    fn sim_batch_unordered_shuffles_count() {
3307        let flow = FlowBuilder::new();
3308        let node = flow.process::<()>();
3309
3310        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3311
3312        let tick = node.tick();
3313        let batch = input.batch(&tick, nondet!(/** test */));
3314        let out_recv = batch.all_ticks().sim_output();
3315
3316        let instance_count = flow.sim().exhaustive(async || {
3317            in_send.send_many_unordered([1, 2, 3, 4]);
3318            out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3319        });
3320
3321        assert_eq!(
3322            instance_count,
3323            75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3324        )
3325    }
3326
3327    #[cfg(feature = "sim")]
3328    #[test]
3329    #[should_panic]
3330    fn sim_observe_order_batched() {
3331        let flow = FlowBuilder::new();
3332        let node = flow.process::<()>();
3333
3334        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3335
3336        let tick = node.tick();
3337        let batch = input.batch(&tick, nondet!(/** test */));
3338        let out_recv = batch
3339            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3340            .all_ticks()
3341            .sim_output();
3342
3343        flow.sim().exhaustive(async || {
3344            in_send.send_many_unordered([1, 2, 3, 4]);
3345            out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3346        });
3347    }
3348
3349    #[cfg(feature = "sim")]
3350    #[test]
3351    fn sim_observe_order_batched_count() {
3352        let flow = FlowBuilder::new();
3353        let node = flow.process::<()>();
3354
3355        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3356
3357        let tick = node.tick();
3358        let batch = input.batch(&tick, nondet!(/** test */));
3359        let out_recv = batch
3360            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3361            .all_ticks()
3362            .sim_output();
3363
3364        let instance_count = flow.sim().exhaustive(async || {
3365            in_send.send_many_unordered([1, 2, 3, 4]);
3366            let _ = out_recv.collect::<Vec<_>>().await;
3367        });
3368
3369        assert_eq!(
3370            instance_count,
3371            192 // 4! * 2^{4 - 1}
3372        )
3373    }
3374
3375    #[cfg(feature = "sim")]
3376    #[test]
3377    fn sim_unordered_count_instance_count() {
3378        let flow = FlowBuilder::new();
3379        let node = flow.process::<()>();
3380
3381        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3382
3383        let tick = node.tick();
3384        let out_recv = input
3385            .count()
3386            .snapshot(&tick, nondet!(/** test */))
3387            .all_ticks()
3388            .sim_output();
3389
3390        let instance_count = flow.sim().exhaustive(async || {
3391            in_send.send_many_unordered([1, 2, 3, 4]);
3392            assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3393        });
3394
3395        assert_eq!(
3396            instance_count,
3397            16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3398        )
3399    }
3400}