hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::KeyedStream;
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::ir::{
19    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
20};
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::forward_handle::{ForwardRef, TickCycle};
24#[cfg(stageleft_runtime)]
25use crate::location::dynamic::{DynLocation, LocationId};
26use crate::location::tick::{Atomic, DeferTick, NoAtomic};
27use crate::location::{Location, NoTick, Tick, check_matching_location};
28use crate::nondet::{NonDet, nondet};
29
30pub mod networking;
31
32/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
33#[sealed::sealed]
34pub trait Ordering:
35    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
36{
37    /// The [`StreamOrder`] corresponding to this type.
38    const ORDERING_KIND: StreamOrder;
39}
40
41/// Marks the stream as being totally ordered, which means that there are
42/// no sources of non-determinism (other than intentional ones) that will
43/// affect the order of elements.
44pub enum TotalOrder {}
45
46#[sealed::sealed]
47impl Ordering for TotalOrder {
48    const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
49}
50
51/// Marks the stream as having no order, which means that the order of
52/// elements may be affected by non-determinism.
53///
54/// This restricts certain operators, such as `fold` and `reduce`, to only
55/// be used with commutative aggregation functions.
56pub enum NoOrder {}
57
58#[sealed::sealed]
59impl Ordering for NoOrder {
60    const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
61}
62
63/// Helper trait for determining the weakest of two orderings.
64#[sealed::sealed]
65pub trait MinOrder<Other: ?Sized> {
66    /// The weaker of the two orderings.
67    type Min: Ordering;
68}
69
70#[sealed::sealed]
71impl MinOrder<NoOrder> for TotalOrder {
72    type Min = NoOrder;
73}
74
75#[sealed::sealed]
76impl MinOrder<TotalOrder> for TotalOrder {
77    type Min = TotalOrder;
78}
79
80#[sealed::sealed]
81impl MinOrder<TotalOrder> for NoOrder {
82    type Min = NoOrder;
83}
84
85#[sealed::sealed]
86impl MinOrder<NoOrder> for NoOrder {
87    type Min = NoOrder;
88}
89
90/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
91#[sealed::sealed]
92pub trait Retries:
93    MinRetries<Self, Min = Self>
94    + MinRetries<ExactlyOnce, Min = Self>
95    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
96{
97    /// The [`StreamRetry`] corresponding to this type.
98    const RETRIES_KIND: StreamRetry;
99}
100
101/// Marks the stream as having deterministic message cardinality, with no
102/// possibility of duplicates.
103pub enum ExactlyOnce {}
104
105#[sealed::sealed]
106impl Retries for ExactlyOnce {
107    const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
108}
109
110/// Marks the stream as having non-deterministic message cardinality, which
111/// means that duplicates may occur, but messages will not be dropped.
112pub enum AtLeastOnce {}
113
114#[sealed::sealed]
115impl Retries for AtLeastOnce {
116    const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
117}
118
119/// Helper trait for determining the weakest of two retry guarantees.
120#[sealed::sealed]
121pub trait MinRetries<Other: ?Sized> {
122    /// The weaker of the two retry guarantees.
123    type Min: Retries;
124}
125
126#[sealed::sealed]
127impl MinRetries<AtLeastOnce> for ExactlyOnce {
128    type Min = AtLeastOnce;
129}
130
131#[sealed::sealed]
132impl MinRetries<ExactlyOnce> for ExactlyOnce {
133    type Min = ExactlyOnce;
134}
135
136#[sealed::sealed]
137impl MinRetries<ExactlyOnce> for AtLeastOnce {
138    type Min = AtLeastOnce;
139}
140
141#[sealed::sealed]
142impl MinRetries<AtLeastOnce> for AtLeastOnce {
143    type Min = AtLeastOnce;
144}
145
146/// Streaming sequence of elements with type `Type`.
147///
148/// This live collection represents a growing sequence of elements, with new elements being
149/// asynchronously appended to the end of the sequence. This can be used to model the arrival
150/// of network input, such as API requests, or streaming ingestion.
151///
152/// By default, all streams have deterministic ordering and each element is materialized exactly
153/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
154/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
155/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
156///
157/// Type Parameters:
158/// - `Type`: the type of elements in the stream
159/// - `Loc`: the location where the stream is being materialized
160/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
161/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
162///   (default is [`TotalOrder`])
163/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
164///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
165pub struct Stream<
166    Type,
167    Loc,
168    Bound: Boundedness,
169    Order: Ordering = TotalOrder,
170    Retry: Retries = ExactlyOnce,
171> {
172    pub(crate) location: Loc,
173    pub(crate) ir_node: RefCell<HydroNode>,
174
175    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
176}
177
178impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
179    for Stream<T, L, Unbounded, O, R>
180where
181    L: Location<'a>,
182{
183    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
184        Stream {
185            location: stream.location,
186            ir_node: stream.ir_node,
187            _phantom: PhantomData,
188        }
189    }
190}
191
192impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
193    for Stream<T, L, B, NoOrder, R>
194where
195    L: Location<'a>,
196{
197    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
198        Stream {
199            location: stream.location,
200            ir_node: stream.ir_node,
201            _phantom: PhantomData,
202        }
203    }
204}
205
206impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
207    for Stream<T, L, B, O, AtLeastOnce>
208where
209    L: Location<'a>,
210{
211    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
212        Stream {
213            location: stream.location,
214            ir_node: stream.ir_node,
215            _phantom: PhantomData,
216        }
217    }
218}
219
220impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
221where
222    L: Location<'a>,
223{
224    fn defer_tick(self) -> Self {
225        Stream::defer_tick(self)
226    }
227}
228
229impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
230    for Stream<T, Tick<L>, Bounded, O, R>
231where
232    L: Location<'a>,
233{
234    type Location = Tick<L>;
235
236    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
237        Stream::new(
238            location.clone(),
239            HydroNode::CycleSource {
240                ident,
241                metadata: location.new_node_metadata(Self::collection_kind()),
242            },
243        )
244    }
245}
246
247impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
248    for Stream<T, Tick<L>, Bounded, O, R>
249where
250    L: Location<'a>,
251{
252    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
253        assert_eq!(
254            Location::id(&self.location),
255            expected_location,
256            "locations do not match"
257        );
258        self.location
259            .flow_state()
260            .borrow_mut()
261            .push_root(HydroRoot::CycleSink {
262                ident,
263                input: Box::new(self.ir_node.into_inner()),
264                op_metadata: HydroIrOpMetadata::new(),
265            });
266    }
267}
268
269impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
270    for Stream<T, L, B, O, R>
271where
272    L: Location<'a> + NoTick,
273{
274    type Location = L;
275
276    fn create_source(ident: syn::Ident, location: L) -> Self {
277        Stream::new(
278            location.clone(),
279            HydroNode::CycleSource {
280                ident,
281                metadata: location.new_node_metadata(Self::collection_kind()),
282            },
283        )
284    }
285}
286
287impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
288    for Stream<T, L, B, O, R>
289where
290    L: Location<'a> + NoTick,
291{
292    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
293        assert_eq!(
294            Location::id(&self.location),
295            expected_location,
296            "locations do not match"
297        );
298        self.location
299            .flow_state()
300            .borrow_mut()
301            .push_root(HydroRoot::CycleSink {
302                ident,
303                input: Box::new(self.ir_node.into_inner()),
304                op_metadata: HydroIrOpMetadata::new(),
305            });
306    }
307}
308
309impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
310where
311    T: Clone,
312    L: Location<'a>,
313{
314    fn clone(&self) -> Self {
315        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
316            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
317            *self.ir_node.borrow_mut() = HydroNode::Tee {
318                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
319                metadata: self.location.new_node_metadata(Self::collection_kind()),
320            };
321        }
322
323        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
324            Stream {
325                location: self.location.clone(),
326                ir_node: HydroNode::Tee {
327                    inner: TeeNode(inner.0.clone()),
328                    metadata: metadata.clone(),
329                }
330                .into(),
331                _phantom: PhantomData,
332            }
333        } else {
334            unreachable!()
335        }
336    }
337}
338
339impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
340where
341    L: Location<'a>,
342{
343    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
344        debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
345        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
346
347        Stream {
348            location,
349            ir_node: RefCell::new(ir_node),
350            _phantom: PhantomData,
351        }
352    }
353
354    /// Returns the [`Location`] where this stream is being materialized.
355    pub fn location(&self) -> &L {
356        &self.location
357    }
358
359    pub(crate) fn collection_kind() -> CollectionKind {
360        CollectionKind::Stream {
361            bound: B::BOUND_KIND,
362            order: O::ORDERING_KIND,
363            retry: R::RETRIES_KIND,
364            element_type: stageleft::quote_type::<T>().into(),
365        }
366    }
367
368    /// Produces a stream based on invoking `f` on each element.
369    /// If you do not want to modify the stream and instead only want to view
370    /// each item use [`Stream::inspect`] instead.
371    ///
372    /// # Example
373    /// ```rust
374    /// # use hydro_lang::prelude::*;
375    /// # use futures::StreamExt;
376    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
377    /// let words = process.source_iter(q!(vec!["hello", "world"]));
378    /// words.map(q!(|x| x.to_uppercase()))
379    /// # }, |mut stream| async move {
380    /// # for w in vec!["HELLO", "WORLD"] {
381    /// #     assert_eq!(stream.next().await.unwrap(), w);
382    /// # }
383    /// # }));
384    /// ```
385    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
386    where
387        F: Fn(T) -> U + 'a,
388    {
389        let f = f.splice_fn1_ctx(&self.location).into();
390        Stream::new(
391            self.location.clone(),
392            HydroNode::Map {
393                f,
394                input: Box::new(self.ir_node.into_inner()),
395                metadata: self
396                    .location
397                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
398            },
399        )
400    }
401
402    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
403    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
404    /// for the output type `U` must produce items in a **deterministic** order.
405    ///
406    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
407    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
408    ///
409    /// # Example
410    /// ```rust
411    /// # use hydro_lang::prelude::*;
412    /// # use futures::StreamExt;
413    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
414    /// process
415    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
416    ///     .flat_map_ordered(q!(|x| x))
417    /// # }, |mut stream| async move {
418    /// // 1, 2, 3, 4
419    /// # for w in (1..5) {
420    /// #     assert_eq!(stream.next().await.unwrap(), w);
421    /// # }
422    /// # }));
423    /// ```
424    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
425    where
426        I: IntoIterator<Item = U>,
427        F: Fn(T) -> I + 'a,
428    {
429        let f = f.splice_fn1_ctx(&self.location).into();
430        Stream::new(
431            self.location.clone(),
432            HydroNode::FlatMap {
433                f,
434                input: Box::new(self.ir_node.into_inner()),
435                metadata: self
436                    .location
437                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
438            },
439        )
440    }
441
442    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
443    /// for the output type `U` to produce items in any order.
444    ///
445    /// # Example
446    /// ```rust
447    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
448    /// # use futures::StreamExt;
449    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
450    /// process
451    ///     .source_iter(q!(vec![
452    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
453    ///         std::collections::HashSet::from_iter(vec![3, 4]),
454    ///     ]))
455    ///     .flat_map_unordered(q!(|x| x))
456    /// # }, |mut stream| async move {
457    /// // 1, 2, 3, 4, but in no particular order
458    /// # let mut results = Vec::new();
459    /// # for w in (1..5) {
460    /// #     results.push(stream.next().await.unwrap());
461    /// # }
462    /// # results.sort();
463    /// # assert_eq!(results, vec![1, 2, 3, 4]);
464    /// # }));
465    /// ```
466    pub fn flat_map_unordered<U, I, F>(
467        self,
468        f: impl IntoQuotedMut<'a, F, L>,
469    ) -> Stream<U, L, B, NoOrder, R>
470    where
471        I: IntoIterator<Item = U>,
472        F: Fn(T) -> I + 'a,
473    {
474        let f = f.splice_fn1_ctx(&self.location).into();
475        Stream::new(
476            self.location.clone(),
477            HydroNode::FlatMap {
478                f,
479                input: Box::new(self.ir_node.into_inner()),
480                metadata: self
481                    .location
482                    .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
483            },
484        )
485    }
486
487    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
488    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
489    ///
490    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
491    /// not deterministic, use [`Stream::flatten_unordered`] instead.
492    ///
493    /// ```rust
494    /// # use hydro_lang::prelude::*;
495    /// # use futures::StreamExt;
496    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
497    /// process
498    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
499    ///     .flatten_ordered()
500    /// # }, |mut stream| async move {
501    /// // 1, 2, 3, 4
502    /// # for w in (1..5) {
503    /// #     assert_eq!(stream.next().await.unwrap(), w);
504    /// # }
505    /// # }));
506    /// ```
507    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
508    where
509        T: IntoIterator<Item = U>,
510    {
511        self.flat_map_ordered(q!(|d| d))
512    }
513
514    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
515    /// for the element type `T` to produce items in any order.
516    ///
517    /// # Example
518    /// ```rust
519    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
520    /// # use futures::StreamExt;
521    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
522    /// process
523    ///     .source_iter(q!(vec![
524    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
525    ///         std::collections::HashSet::from_iter(vec![3, 4]),
526    ///     ]))
527    ///     .flatten_unordered()
528    /// # }, |mut stream| async move {
529    /// // 1, 2, 3, 4, but in no particular order
530    /// # let mut results = Vec::new();
531    /// # for w in (1..5) {
532    /// #     results.push(stream.next().await.unwrap());
533    /// # }
534    /// # results.sort();
535    /// # assert_eq!(results, vec![1, 2, 3, 4]);
536    /// # }));
537    /// ```
538    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
539    where
540        T: IntoIterator<Item = U>,
541    {
542        self.flat_map_unordered(q!(|d| d))
543    }
544
545    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
546    /// `f`, preserving the order of the elements.
547    ///
548    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
549    /// not modify or take ownership of the values. If you need to modify the values while filtering
550    /// use [`Stream::filter_map`] instead.
551    ///
552    /// # Example
553    /// ```rust
554    /// # use hydro_lang::prelude::*;
555    /// # use futures::StreamExt;
556    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
557    /// process
558    ///     .source_iter(q!(vec![1, 2, 3, 4]))
559    ///     .filter(q!(|&x| x > 2))
560    /// # }, |mut stream| async move {
561    /// // 3, 4
562    /// # for w in (3..5) {
563    /// #     assert_eq!(stream.next().await.unwrap(), w);
564    /// # }
565    /// # }));
566    /// ```
567    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
568    where
569        F: Fn(&T) -> bool + 'a,
570    {
571        let f = f.splice_fn1_borrow_ctx(&self.location).into();
572        Stream::new(
573            self.location.clone(),
574            HydroNode::Filter {
575                f,
576                input: Box::new(self.ir_node.into_inner()),
577                metadata: self.location.new_node_metadata(Self::collection_kind()),
578            },
579        )
580    }
581
582    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
583    ///
584    /// # Example
585    /// ```rust
586    /// # use hydro_lang::prelude::*;
587    /// # use futures::StreamExt;
588    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
589    /// process
590    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
591    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
592    /// # }, |mut stream| async move {
593    /// // 1, 2
594    /// # for w in (1..3) {
595    /// #     assert_eq!(stream.next().await.unwrap(), w);
596    /// # }
597    /// # }));
598    /// ```
599    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
600    where
601        F: Fn(T) -> Option<U> + 'a,
602    {
603        let f = f.splice_fn1_ctx(&self.location).into();
604        Stream::new(
605            self.location.clone(),
606            HydroNode::FilterMap {
607                f,
608                input: Box::new(self.ir_node.into_inner()),
609                metadata: self
610                    .location
611                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
612            },
613        )
614    }
615
616    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
617    /// where `x` is the final value of `other`, a bounded [`Singleton`].
618    ///
619    /// # Example
620    /// ```rust
621    /// # use hydro_lang::prelude::*;
622    /// # use futures::StreamExt;
623    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
624    /// let tick = process.tick();
625    /// let batch = process
626    ///   .source_iter(q!(vec![1, 2, 3, 4]))
627    ///   .batch(&tick, nondet!(/** test */));
628    /// let count = batch.clone().count(); // `count()` returns a singleton
629    /// batch.cross_singleton(count).all_ticks()
630    /// # }, |mut stream| async move {
631    /// // (1, 4), (2, 4), (3, 4), (4, 4)
632    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
633    /// #     assert_eq!(stream.next().await.unwrap(), w);
634    /// # }
635    /// # }));
636    /// ```
637    pub fn cross_singleton<O2>(
638        self,
639        other: impl Into<Optional<O2, L, Bounded>>,
640    ) -> Stream<(T, O2), L, B, O, R>
641    where
642        O2: Clone,
643    {
644        let other: Optional<O2, L, Bounded> = other.into();
645        check_matching_location(&self.location, &other.location);
646
647        Stream::new(
648            self.location.clone(),
649            HydroNode::CrossSingleton {
650                left: Box::new(self.ir_node.into_inner()),
651                right: Box::new(other.ir_node.into_inner()),
652                metadata: self
653                    .location
654                    .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
655            },
656        )
657    }
658
659    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
660    ///
661    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
662    /// leader of a cluster.
663    ///
664    /// # Example
665    /// ```rust
666    /// # use hydro_lang::prelude::*;
667    /// # use futures::StreamExt;
668    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
669    /// let tick = process.tick();
670    /// // ticks are lazy by default, forces the second tick to run
671    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
672    ///
673    /// let batch_first_tick = process
674    ///   .source_iter(q!(vec![1, 2, 3, 4]))
675    ///   .batch(&tick, nondet!(/** test */));
676    /// let batch_second_tick = process
677    ///   .source_iter(q!(vec![5, 6, 7, 8]))
678    ///   .batch(&tick, nondet!(/** test */))
679    ///   .defer_tick(); // appears on the second tick
680    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
681    /// batch_first_tick.chain(batch_second_tick)
682    ///   .filter_if_some(some_on_first_tick)
683    ///   .all_ticks()
684    /// # }, |mut stream| async move {
685    /// // [1, 2, 3, 4]
686    /// # for w in vec![1, 2, 3, 4] {
687    /// #     assert_eq!(stream.next().await.unwrap(), w);
688    /// # }
689    /// # }));
690    /// ```
691    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
692        self.cross_singleton(signal.map(q!(|_u| ())))
693            .map(q!(|(d, _signal)| d))
694    }
695
696    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
697    ///
698    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
699    /// some local state.
700    ///
701    /// # Example
702    /// ```rust
703    /// # use hydro_lang::prelude::*;
704    /// # use futures::StreamExt;
705    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
706    /// let tick = process.tick();
707    /// // ticks are lazy by default, forces the second tick to run
708    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
709    ///
710    /// let batch_first_tick = process
711    ///   .source_iter(q!(vec![1, 2, 3, 4]))
712    ///   .batch(&tick, nondet!(/** test */));
713    /// let batch_second_tick = process
714    ///   .source_iter(q!(vec![5, 6, 7, 8]))
715    ///   .batch(&tick, nondet!(/** test */))
716    ///   .defer_tick(); // appears on the second tick
717    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
718    /// batch_first_tick.chain(batch_second_tick)
719    ///   .filter_if_none(some_on_first_tick)
720    ///   .all_ticks()
721    /// # }, |mut stream| async move {
722    /// // [5, 6, 7, 8]
723    /// # for w in vec![5, 6, 7, 8] {
724    /// #     assert_eq!(stream.next().await.unwrap(), w);
725    /// # }
726    /// # }));
727    /// ```
728    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
729        self.filter_if_some(
730            other
731                .map(q!(|_| ()))
732                .into_singleton()
733                .filter(q!(|o| o.is_none())),
734        )
735    }
736
737    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
738    /// tupled pairs in a non-deterministic order.
739    ///
740    /// # Example
741    /// ```rust
742    /// # use hydro_lang::prelude::*;
743    /// # use std::collections::HashSet;
744    /// # use futures::StreamExt;
745    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
746    /// let tick = process.tick();
747    /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
748    /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
749    /// stream1.cross_product(stream2)
750    /// # }, |mut stream| async move {
751    /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
752    /// # stream.map(|i| assert!(expected.contains(&i)));
753    /// # }));
754    /// ```
755    pub fn cross_product<T2, O2: Ordering>(
756        self,
757        other: Stream<T2, L, B, O2, R>,
758    ) -> Stream<(T, T2), L, B, NoOrder, R>
759    where
760        T: Clone,
761        T2: Clone,
762    {
763        check_matching_location(&self.location, &other.location);
764
765        Stream::new(
766            self.location.clone(),
767            HydroNode::CrossProduct {
768                left: Box::new(self.ir_node.into_inner()),
769                right: Box::new(other.ir_node.into_inner()),
770                metadata: self
771                    .location
772                    .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
773            },
774        )
775    }
776
777    /// Takes one stream as input and filters out any duplicate occurrences. The output
778    /// contains all unique values from the input.
779    ///
780    /// # Example
781    /// ```rust
782    /// # use hydro_lang::prelude::*;
783    /// # use futures::StreamExt;
784    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
785    /// let tick = process.tick();
786    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
787    /// # }, |mut stream| async move {
788    /// # for w in vec![1, 2, 3, 4] {
789    /// #     assert_eq!(stream.next().await.unwrap(), w);
790    /// # }
791    /// # }));
792    /// ```
793    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
794    where
795        T: Eq + Hash,
796    {
797        Stream::new(
798            self.location.clone(),
799            HydroNode::Unique {
800                input: Box::new(self.ir_node.into_inner()),
801                metadata: self
802                    .location
803                    .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
804            },
805        )
806    }
807
808    /// Outputs everything in this stream that is *not* contained in the `other` stream.
809    ///
810    /// The `other` stream must be [`Bounded`], since this function will wait until
811    /// all its elements are available before producing any output.
812    /// # Example
813    /// ```rust
814    /// # use hydro_lang::prelude::*;
815    /// # use futures::StreamExt;
816    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
817    /// let tick = process.tick();
818    /// let stream = process
819    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
820    ///   .batch(&tick, nondet!(/** test */));
821    /// let batch = process
822    ///   .source_iter(q!(vec![1, 2]))
823    ///   .batch(&tick, nondet!(/** test */));
824    /// stream.filter_not_in(batch).all_ticks()
825    /// # }, |mut stream| async move {
826    /// # for w in vec![3, 4] {
827    /// #     assert_eq!(stream.next().await.unwrap(), w);
828    /// # }
829    /// # }));
830    /// ```
831    pub fn filter_not_in<O2: Ordering>(self, other: Stream<T, L, Bounded, O2, R>) -> Self
832    where
833        T: Eq + Hash,
834    {
835        check_matching_location(&self.location, &other.location);
836
837        Stream::new(
838            self.location.clone(),
839            HydroNode::Difference {
840                pos: Box::new(self.ir_node.into_inner()),
841                neg: Box::new(other.ir_node.into_inner()),
842                metadata: self
843                    .location
844                    .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
845            },
846        )
847    }
848
849    /// An operator which allows you to "inspect" each element of a stream without
850    /// modifying it. The closure `f` is called on a reference to each item. This is
851    /// mainly useful for debugging, and should not be used to generate side-effects.
852    ///
853    /// # Example
854    /// ```rust
855    /// # use hydro_lang::prelude::*;
856    /// # use futures::StreamExt;
857    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
858    /// let nums = process.source_iter(q!(vec![1, 2]));
859    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
860    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
861    /// # }, |mut stream| async move {
862    /// # for w in vec![1, 2] {
863    /// #     assert_eq!(stream.next().await.unwrap(), w);
864    /// # }
865    /// # }));
866    /// ```
867    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
868    where
869        F: Fn(&T) + 'a,
870    {
871        let f = f.splice_fn1_borrow_ctx(&self.location).into();
872
873        Stream::new(
874            self.location.clone(),
875            HydroNode::Inspect {
876                f,
877                input: Box::new(self.ir_node.into_inner()),
878                metadata: self.location.new_node_metadata(Self::collection_kind()),
879            },
880        )
881    }
882
883    /// An operator which allows you to "name" a `HydroNode`.
884    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
885    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
886        {
887            let mut node = self.ir_node.borrow_mut();
888            let metadata = node.metadata_mut();
889            metadata.tag = Some(name.to_string());
890        }
891        self
892    }
893
894    /// Explicitly "casts" the stream to a type with a different ordering
895    /// guarantee. Useful in unsafe code where the ordering cannot be proven
896    /// by the type-system.
897    ///
898    /// # Non-Determinism
899    /// This function is used as an escape hatch, and any mistakes in the
900    /// provided ordering guarantee will propagate into the guarantees
901    /// for the rest of the program.
902    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
903        if O::ORDERING_KIND == O2::ORDERING_KIND {
904            Stream::new(self.location, self.ir_node.into_inner())
905        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
906            // We can always weaken the ordering guarantee
907            Stream::new(
908                self.location.clone(),
909                HydroNode::Cast {
910                    inner: Box::new(self.ir_node.into_inner()),
911                    metadata: self
912                        .location
913                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
914                },
915            )
916        } else {
917            Stream::new(
918                self.location.clone(),
919                HydroNode::ObserveNonDet {
920                    inner: Box::new(self.ir_node.into_inner()),
921                    trusted: false,
922                    metadata: self
923                        .location
924                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
925                },
926            )
927        }
928    }
929
930    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
931    // is not observable
932    fn assume_ordering_trusted<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
933        if O::ORDERING_KIND == O2::ORDERING_KIND {
934            Stream::new(self.location, self.ir_node.into_inner())
935        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
936            // We can always weaken the ordering guarantee
937            Stream::new(
938                self.location.clone(),
939                HydroNode::Cast {
940                    inner: Box::new(self.ir_node.into_inner()),
941                    metadata: self
942                        .location
943                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
944                },
945            )
946        } else {
947            Stream::new(
948                self.location.clone(),
949                HydroNode::ObserveNonDet {
950                    inner: Box::new(self.ir_node.into_inner()),
951                    trusted: true,
952                    metadata: self
953                        .location
954                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
955                },
956            )
957        }
958    }
959
960    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
961    /// which is always safe because that is the weakest possible guarantee.
962    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
963        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
964        self.assume_ordering::<NoOrder>(nondet)
965    }
966
967    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
968    /// enforcing that `O2` is weaker than the input ordering guarantee.
969    pub fn weaken_ordering<O2: Ordering + MinOrder<O, Min = O2>>(self) -> Stream<T, L, B, O2, R> {
970        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
971        self.assume_ordering::<O2>(nondet)
972    }
973
974    /// Explicitly "casts" the stream to a type with a different retries
975    /// guarantee. Useful in unsafe code where the lack of retries cannot
976    /// be proven by the type-system.
977    ///
978    /// # Non-Determinism
979    /// This function is used as an escape hatch, and any mistakes in the
980    /// provided retries guarantee will propagate into the guarantees
981    /// for the rest of the program.
982    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
983        if R::RETRIES_KIND == R2::RETRIES_KIND {
984            Stream::new(self.location, self.ir_node.into_inner())
985        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
986            // We can always weaken the retries guarantee
987            Stream::new(
988                self.location.clone(),
989                HydroNode::Cast {
990                    inner: Box::new(self.ir_node.into_inner()),
991                    metadata: self
992                        .location
993                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
994                },
995            )
996        } else {
997            Stream::new(
998                self.location.clone(),
999                HydroNode::ObserveNonDet {
1000                    inner: Box::new(self.ir_node.into_inner()),
1001                    trusted: false,
1002                    metadata: self
1003                        .location
1004                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1005                },
1006            )
1007        }
1008    }
1009
1010    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1011    // is not observable
1012    fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1013        if R::RETRIES_KIND == R2::RETRIES_KIND {
1014            Stream::new(self.location, self.ir_node.into_inner())
1015        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1016            // We can always weaken the retries guarantee
1017            Stream::new(
1018                self.location.clone(),
1019                HydroNode::Cast {
1020                    inner: Box::new(self.ir_node.into_inner()),
1021                    metadata: self
1022                        .location
1023                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1024                },
1025            )
1026        } else {
1027            Stream::new(
1028                self.location.clone(),
1029                HydroNode::ObserveNonDet {
1030                    inner: Box::new(self.ir_node.into_inner()),
1031                    trusted: true,
1032                    metadata: self
1033                        .location
1034                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1035                },
1036            )
1037        }
1038    }
1039
1040    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
1041    /// which is always safe because that is the weakest possible guarantee.
1042    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
1043        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1044        self.assume_retries::<AtLeastOnce>(nondet)
1045    }
1046
1047    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
1048    /// enforcing that `R2` is weaker than the input retries guarantee.
1049    pub fn weaken_retries<R2: Retries + MinRetries<R, Min = R2>>(self) -> Stream<T, L, B, O, R2> {
1050        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1051        self.assume_retries::<R2>(nondet)
1052    }
1053}
1054
1055impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1056where
1057    L: Location<'a>,
1058{
1059    /// Given a stream with [`ExactlyOnce`] retry guarantees, weakens it to an arbitrary guarantee
1060    /// `R2`, which is safe because all guarantees are equal to or weaker than [`ExactlyOnce`]
1061    pub fn weaker_retries<R2: Retries>(self) -> Stream<T, L, B, O, R2> {
1062        self.assume_retries(
1063            nondet!(/** any retry ordering is the same or weaker than ExactlyOnce */),
1064        )
1065    }
1066}
1067
1068impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
1069where
1070    L: Location<'a>,
1071{
1072    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
1073    ///
1074    /// # Example
1075    /// ```rust
1076    /// # use hydro_lang::prelude::*;
1077    /// # use futures::StreamExt;
1078    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1079    /// process.source_iter(q!(&[1, 2, 3])).cloned()
1080    /// # }, |mut stream| async move {
1081    /// // 1, 2, 3
1082    /// # for w in vec![1, 2, 3] {
1083    /// #     assert_eq!(stream.next().await.unwrap(), w);
1084    /// # }
1085    /// # }));
1086    /// ```
1087    pub fn cloned(self) -> Stream<T, L, B, O, R>
1088    where
1089        T: Clone,
1090    {
1091        self.map(q!(|d| d.clone()))
1092    }
1093}
1094
1095impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
1096where
1097    L: Location<'a>,
1098{
1099    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1100    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1101    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1102    ///
1103    /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
1104    /// and there may be duplicates.
1105    ///
1106    /// # Example
1107    /// ```rust
1108    /// # use hydro_lang::prelude::*;
1109    /// # use futures::StreamExt;
1110    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1111    /// let tick = process.tick();
1112    /// let bools = process.source_iter(q!(vec![false, true, false]));
1113    /// let batch = bools.batch(&tick, nondet!(/** test */));
1114    /// batch
1115    ///     .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1116    ///     .all_ticks()
1117    /// # }, |mut stream| async move {
1118    /// // true
1119    /// # assert_eq!(stream.next().await.unwrap(), true);
1120    /// # }));
1121    /// ```
1122    pub fn fold_commutative_idempotent<A, I, F>(
1123        self,
1124        init: impl IntoQuotedMut<'a, I, L>,
1125        comb: impl IntoQuotedMut<'a, F, L>,
1126    ) -> Singleton<A, L, B>
1127    where
1128        I: Fn() -> A + 'a,
1129        F: Fn(&mut A, T),
1130    {
1131        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1132        self.assume_ordering(nondet)
1133            .assume_retries(nondet)
1134            .fold(init, comb)
1135    }
1136
1137    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1138    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1139    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1140    /// reference, so that it can be modified in place.
1141    ///
1142    /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
1143    /// and there may be duplicates.
1144    ///
1145    /// # Example
1146    /// ```rust
1147    /// # use hydro_lang::prelude::*;
1148    /// # use futures::StreamExt;
1149    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1150    /// let tick = process.tick();
1151    /// let bools = process.source_iter(q!(vec![false, true, false]));
1152    /// let batch = bools.batch(&tick, nondet!(/** test */));
1153    /// batch
1154    ///     .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1155    ///     .all_ticks()
1156    /// # }, |mut stream| async move {
1157    /// // true
1158    /// # assert_eq!(stream.next().await.unwrap(), true);
1159    /// # }));
1160    /// ```
1161    pub fn reduce_commutative_idempotent<F>(
1162        self,
1163        comb: impl IntoQuotedMut<'a, F, L>,
1164    ) -> Optional<T, L, B>
1165    where
1166        F: Fn(&mut T, T) + 'a,
1167    {
1168        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1169        self.assume_ordering(nondet)
1170            .assume_retries(nondet)
1171            .reduce(comb)
1172    }
1173
1174    // only for internal APIs that have been carefully vetted, will eventually be removed once we
1175    // have algebraic verification of these properties
1176    fn reduce_commutative_idempotent_trusted<F>(
1177        self,
1178        comb: impl IntoQuotedMut<'a, F, L>,
1179    ) -> Optional<T, L, B>
1180    where
1181        F: Fn(&mut T, T) + 'a,
1182    {
1183        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1184
1185        let ordered = if B::BOUNDED {
1186            self.assume_ordering_trusted(nondet)
1187        } else {
1188            self.assume_ordering(nondet) // if unbounded, ordering affects intermediate states
1189        };
1190
1191        ordered
1192            .assume_retries_trusted(nondet) // retries never affect intermediate states
1193            .reduce(comb)
1194    }
1195
1196    /// Computes the maximum element in the stream as an [`Optional`], which
1197    /// will be empty until the first element in the input arrives.
1198    ///
1199    /// # Example
1200    /// ```rust
1201    /// # use hydro_lang::prelude::*;
1202    /// # use futures::StreamExt;
1203    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1204    /// let tick = process.tick();
1205    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1206    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1207    /// batch.max().all_ticks()
1208    /// # }, |mut stream| async move {
1209    /// // 4
1210    /// # assert_eq!(stream.next().await.unwrap(), 4);
1211    /// # }));
1212    /// ```
1213    pub fn max(self) -> Optional<T, L, B>
1214    where
1215        T: Ord,
1216    {
1217        self.reduce_commutative_idempotent_trusted(q!(|curr, new| {
1218            if new > *curr {
1219                *curr = new;
1220            }
1221        }))
1222    }
1223
1224    /// Computes the minimum element in the stream as an [`Optional`], which
1225    /// will be empty until the first element in the input arrives.
1226    ///
1227    /// # Example
1228    /// ```rust
1229    /// # use hydro_lang::prelude::*;
1230    /// # use futures::StreamExt;
1231    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1232    /// let tick = process.tick();
1233    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1234    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1235    /// batch.min().all_ticks()
1236    /// # }, |mut stream| async move {
1237    /// // 1
1238    /// # assert_eq!(stream.next().await.unwrap(), 1);
1239    /// # }));
1240    /// ```
1241    pub fn min(self) -> Optional<T, L, B>
1242    where
1243        T: Ord,
1244    {
1245        self.reduce_commutative_idempotent_trusted(q!(|curr, new| {
1246            if new < *curr {
1247                *curr = new;
1248            }
1249        }))
1250    }
1251}
1252
1253impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1254where
1255    L: Location<'a>,
1256{
1257    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1258    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1259    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1260    ///
1261    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1262    ///
1263    /// # Example
1264    /// ```rust
1265    /// # use hydro_lang::prelude::*;
1266    /// # use futures::StreamExt;
1267    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1268    /// let tick = process.tick();
1269    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1270    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1271    /// batch
1272    ///     .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1273    ///     .all_ticks()
1274    /// # }, |mut stream| async move {
1275    /// // 10
1276    /// # assert_eq!(stream.next().await.unwrap(), 10);
1277    /// # }));
1278    /// ```
1279    pub fn fold_commutative<A, I, F>(
1280        self,
1281        init: impl IntoQuotedMut<'a, I, L>,
1282        comb: impl IntoQuotedMut<'a, F, L>,
1283    ) -> Singleton<A, L, B>
1284    where
1285        I: Fn() -> A + 'a,
1286        F: Fn(&mut A, T),
1287    {
1288        let nondet = nondet!(/** the combinator function is commutative */);
1289        self.assume_ordering(nondet).fold(init, comb)
1290    }
1291
1292    /// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream,
1293    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1294    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1295    /// reference, so that it can be modified in place.
1296    ///
1297    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1298    ///
1299    /// # Example
1300    /// ```rust
1301    /// # use hydro_lang::prelude::*;
1302    /// # use futures::StreamExt;
1303    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1304    /// let tick = process.tick();
1305    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1306    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1307    /// batch
1308    ///     .reduce_commutative(q!(|curr, new| *curr += new))
1309    ///     .all_ticks()
1310    /// # }, |mut stream| async move {
1311    /// // 10
1312    /// # assert_eq!(stream.next().await.unwrap(), 10);
1313    /// # }));
1314    /// ```
1315    pub fn reduce_commutative<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1316    where
1317        F: Fn(&mut T, T) + 'a,
1318    {
1319        let nondet = nondet!(/** the combinator function is commutative */);
1320        self.assume_ordering(nondet).reduce(comb)
1321    }
1322
1323    /// Computes the number of elements in the stream as a [`Singleton`].
1324    ///
1325    /// # Example
1326    /// ```rust
1327    /// # use hydro_lang::prelude::*;
1328    /// # use futures::StreamExt;
1329    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1330    /// let tick = process.tick();
1331    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1332    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1333    /// batch.count().all_ticks()
1334    /// # }, |mut stream| async move {
1335    /// // 4
1336    /// # assert_eq!(stream.next().await.unwrap(), 4);
1337    /// # }));
1338    /// ```
1339    pub fn count(self) -> Singleton<usize, L, B> {
1340        self.assume_ordering_trusted(nondet!(
1341            /// Order does not affect eventual count, and also does not affect intermediate states.
1342        ))
1343        .fold(q!(|| 0usize), q!(|count, _| *count += 1))
1344    }
1345}
1346
1347impl<'a, T, L, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R>
1348where
1349    L: Location<'a>,
1350{
1351    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1352    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1353    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1354    ///
1355    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1356    ///
1357    /// # Example
1358    /// ```rust
1359    /// # use hydro_lang::prelude::*;
1360    /// # use futures::StreamExt;
1361    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1362    /// let tick = process.tick();
1363    /// let bools = process.source_iter(q!(vec![false, true, false]));
1364    /// let batch = bools.batch(&tick, nondet!(/** test */));
1365    /// batch
1366    ///     .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1367    ///     .all_ticks()
1368    /// # }, |mut stream| async move {
1369    /// // true
1370    /// # assert_eq!(stream.next().await.unwrap(), true);
1371    /// # }));
1372    /// ```
1373    pub fn fold_idempotent<A, I, F>(
1374        self,
1375        init: impl IntoQuotedMut<'a, I, L>,
1376        comb: impl IntoQuotedMut<'a, F, L>,
1377    ) -> Singleton<A, L, B>
1378    where
1379        I: Fn() -> A + 'a,
1380        F: Fn(&mut A, T),
1381    {
1382        let nondet = nondet!(/** the combinator function is idempotent */);
1383        self.assume_retries(nondet).fold(init, comb)
1384    }
1385
1386    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1387    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1388    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1389    /// reference, so that it can be modified in place.
1390    ///
1391    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1392    ///
1393    /// # Example
1394    /// ```rust
1395    /// # use hydro_lang::prelude::*;
1396    /// # use futures::StreamExt;
1397    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1398    /// let tick = process.tick();
1399    /// let bools = process.source_iter(q!(vec![false, true, false]));
1400    /// let batch = bools.batch(&tick, nondet!(/** test */));
1401    /// batch.reduce_idempotent(q!(|acc, x| *acc |= x)).all_ticks()
1402    /// # }, |mut stream| async move {
1403    /// // true
1404    /// # assert_eq!(stream.next().await.unwrap(), true);
1405    /// # }));
1406    /// ```
1407    pub fn reduce_idempotent<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1408    where
1409        F: Fn(&mut T, T) + 'a,
1410    {
1411        let nondet = nondet!(/** the combinator function is idempotent */);
1412        self.assume_retries(nondet).reduce(comb)
1413    }
1414
1415    /// Computes the first element in the stream as an [`Optional`], which
1416    /// will be empty until the first element in the input arrives.
1417    ///
1418    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1419    /// re-ordering of elements may cause the first element to change.
1420    ///
1421    /// # Example
1422    /// ```rust
1423    /// # use hydro_lang::prelude::*;
1424    /// # use futures::StreamExt;
1425    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1426    /// let tick = process.tick();
1427    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1428    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1429    /// batch.first().all_ticks()
1430    /// # }, |mut stream| async move {
1431    /// // 1
1432    /// # assert_eq!(stream.next().await.unwrap(), 1);
1433    /// # }));
1434    /// ```
1435    pub fn first(self) -> Optional<T, L, B> {
1436        self.reduce_idempotent(q!(|_, _| {}))
1437    }
1438
1439    /// Computes the last element in the stream as an [`Optional`], which
1440    /// will be empty until an element in the input arrives.
1441    ///
1442    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1443    /// re-ordering of elements may cause the last element to change.
1444    ///
1445    /// # Example
1446    /// ```rust
1447    /// # use hydro_lang::prelude::*;
1448    /// # use futures::StreamExt;
1449    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1450    /// let tick = process.tick();
1451    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1452    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1453    /// batch.last().all_ticks()
1454    /// # }, |mut stream| async move {
1455    /// // 4
1456    /// # assert_eq!(stream.next().await.unwrap(), 4);
1457    /// # }));
1458    /// ```
1459    pub fn last(self) -> Optional<T, L, B> {
1460        self.reduce_idempotent(q!(|curr, new| *curr = new))
1461    }
1462}
1463
1464impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
1465where
1466    L: Location<'a>,
1467{
1468    /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1469    ///
1470    /// # Example
1471    /// ```rust
1472    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1473    /// # use futures::StreamExt;
1474    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, TotalOrder, ExactlyOnce>(|process| {
1475    /// let tick = process.tick();
1476    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1477    /// numbers.enumerate()
1478    /// # }, |mut stream| async move {
1479    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1480    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1481    /// #     assert_eq!(stream.next().await.unwrap(), w);
1482    /// # }
1483    /// # }));
1484    /// ```
1485    pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder, ExactlyOnce> {
1486        Stream::new(
1487            self.location.clone(),
1488            HydroNode::Enumerate {
1489                input: Box::new(self.ir_node.into_inner()),
1490                metadata: self.location.new_node_metadata(Stream::<
1491                    (usize, T),
1492                    L,
1493                    B,
1494                    TotalOrder,
1495                    ExactlyOnce,
1496                >::collection_kind()),
1497            },
1498        )
1499    }
1500
1501    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1502    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1503    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1504    ///
1505    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1506    /// to depend on the order of elements in the stream.
1507    ///
1508    /// # Example
1509    /// ```rust
1510    /// # use hydro_lang::prelude::*;
1511    /// # use futures::StreamExt;
1512    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1513    /// let tick = process.tick();
1514    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1515    /// let batch = words.batch(&tick, nondet!(/** test */));
1516    /// batch
1517    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1518    ///     .all_ticks()
1519    /// # }, |mut stream| async move {
1520    /// // "HELLOWORLD"
1521    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1522    /// # }));
1523    /// ```
1524    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
1525        self,
1526        init: impl IntoQuotedMut<'a, I, L>,
1527        comb: impl IntoQuotedMut<'a, F, L>,
1528    ) -> Singleton<A, L, B> {
1529        let init = init.splice_fn0_ctx(&self.location).into();
1530        let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1531
1532        let core = HydroNode::Fold {
1533            init,
1534            acc: comb,
1535            input: Box::new(self.ir_node.into_inner()),
1536            metadata: self
1537                .location
1538                .new_node_metadata(Singleton::<A, L, B>::collection_kind()),
1539        };
1540
1541        Singleton::new(self.location, core)
1542    }
1543
1544    /// Collects all the elements of this stream into a single [`Vec`] element.
1545    ///
1546    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1547    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1548    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1549    /// the vector at an arbitrary point in time.
1550    ///
1551    /// # Example
1552    /// ```rust
1553    /// # use hydro_lang::prelude::*;
1554    /// # use futures::StreamExt;
1555    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1556    /// let tick = process.tick();
1557    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1558    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1559    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1560    /// # }, |mut stream| async move {
1561    /// // [ vec![1, 2, 3, 4] ]
1562    /// # for w in vec![vec![1, 2, 3, 4]] {
1563    /// #     assert_eq!(stream.next().await.unwrap(), w);
1564    /// # }
1565    /// # }));
1566    /// ```
1567    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B> {
1568        self.fold(
1569            q!(|| vec![]),
1570            q!(|acc, v| {
1571                acc.push(v);
1572            }),
1573        )
1574    }
1575
1576    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1577    /// and emitting each intermediate result.
1578    ///
1579    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1580    /// containing all intermediate accumulated values. The scan operation can also terminate early
1581    /// by returning `None`.
1582    ///
1583    /// The function takes a mutable reference to the accumulator and the current element, and returns
1584    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1585    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1586    ///
1587    /// # Examples
1588    ///
1589    /// Basic usage - running sum:
1590    /// ```rust
1591    /// # use hydro_lang::prelude::*;
1592    /// # use futures::StreamExt;
1593    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1594    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1595    ///     q!(|| 0),
1596    ///     q!(|acc, x| {
1597    ///         *acc += x;
1598    ///         Some(*acc)
1599    ///     }),
1600    /// )
1601    /// # }, |mut stream| async move {
1602    /// // Output: 1, 3, 6, 10
1603    /// # for w in vec![1, 3, 6, 10] {
1604    /// #     assert_eq!(stream.next().await.unwrap(), w);
1605    /// # }
1606    /// # }));
1607    /// ```
1608    ///
1609    /// Early termination example:
1610    /// ```rust
1611    /// # use hydro_lang::prelude::*;
1612    /// # use futures::StreamExt;
1613    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1614    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1615    ///     q!(|| 1),
1616    ///     q!(|state, x| {
1617    ///         *state = *state * x;
1618    ///         if *state > 6 {
1619    ///             None // Terminate the stream
1620    ///         } else {
1621    ///             Some(-*state)
1622    ///         }
1623    ///     }),
1624    /// )
1625    /// # }, |mut stream| async move {
1626    /// // Output: -1, -2, -6
1627    /// # for w in vec![-1, -2, -6] {
1628    /// #     assert_eq!(stream.next().await.unwrap(), w);
1629    /// # }
1630    /// # }));
1631    /// ```
1632    pub fn scan<A, U, I, F>(
1633        self,
1634        init: impl IntoQuotedMut<'a, I, L>,
1635        f: impl IntoQuotedMut<'a, F, L>,
1636    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1637    where
1638        I: Fn() -> A + 'a,
1639        F: Fn(&mut A, T) -> Option<U> + 'a,
1640    {
1641        let init = init.splice_fn0_ctx(&self.location).into();
1642        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1643
1644        Stream::new(
1645            self.location.clone(),
1646            HydroNode::Scan {
1647                init,
1648                acc: f,
1649                input: Box::new(self.ir_node.into_inner()),
1650                metadata: self.location.new_node_metadata(
1651                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1652                ),
1653            },
1654        )
1655    }
1656
1657    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1658    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1659    /// until the first element in the input arrives.
1660    ///
1661    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1662    /// to depend on the order of elements in the stream.
1663    ///
1664    /// # Example
1665    /// ```rust
1666    /// # use hydro_lang::prelude::*;
1667    /// # use futures::StreamExt;
1668    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1669    /// let tick = process.tick();
1670    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1671    /// let batch = words.batch(&tick, nondet!(/** test */));
1672    /// batch
1673    ///     .map(q!(|x| x.to_string()))
1674    ///     .reduce(q!(|curr, new| curr.push_str(&new)))
1675    ///     .all_ticks()
1676    /// # }, |mut stream| async move {
1677    /// // "HELLOWORLD"
1678    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1679    /// # }));
1680    /// ```
1681    pub fn reduce<F: Fn(&mut T, T) + 'a>(
1682        self,
1683        comb: impl IntoQuotedMut<'a, F, L>,
1684    ) -> Optional<T, L, B> {
1685        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1686        let core = HydroNode::Reduce {
1687            f,
1688            input: Box::new(self.ir_node.into_inner()),
1689            metadata: self
1690                .location
1691                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1692        };
1693
1694        Optional::new(self.location, core)
1695    }
1696}
1697
1698impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
1699    /// Produces a new stream that interleaves the elements of the two input streams.
1700    /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1701    ///
1702    /// Currently, both input streams must be [`Unbounded`]. When the streams are
1703    /// [`Bounded`], you can use [`Stream::chain`] instead.
1704    ///
1705    /// # Example
1706    /// ```rust
1707    /// # use hydro_lang::prelude::*;
1708    /// # use futures::StreamExt;
1709    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1710    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1711    /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1712    /// # }, |mut stream| async move {
1713    /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1714    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1715    /// #     assert_eq!(stream.next().await.unwrap(), w);
1716    /// # }
1717    /// # }));
1718    /// ```
1719    pub fn interleave<O2: Ordering, R2: Retries>(
1720        self,
1721        other: Stream<T, L, Unbounded, O2, R2>,
1722    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1723    where
1724        R: MinRetries<R2>,
1725    {
1726        Stream::new(
1727            self.location.clone(),
1728            HydroNode::Chain {
1729                first: Box::new(self.ir_node.into_inner()),
1730                second: Box::new(other.ir_node.into_inner()),
1731                metadata: self.location.new_node_metadata(Stream::<
1732                    T,
1733                    L,
1734                    Unbounded,
1735                    NoOrder,
1736                    <R as MinRetries<R2>>::Min,
1737                >::collection_kind()),
1738            },
1739        )
1740    }
1741}
1742
1743impl<'a, T, L, O: Ordering, R: Retries> Stream<T, L, Bounded, O, R>
1744where
1745    L: Location<'a>,
1746{
1747    /// Produces a new stream that emits the input elements in sorted order.
1748    ///
1749    /// The input stream can have any ordering guarantee, but the output stream
1750    /// will have a [`TotalOrder`] guarantee. This operator will block until all
1751    /// elements in the input stream are available, so it requires the input stream
1752    /// to be [`Bounded`].
1753    ///
1754    /// # Example
1755    /// ```rust
1756    /// # use hydro_lang::prelude::*;
1757    /// # use futures::StreamExt;
1758    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1759    /// let tick = process.tick();
1760    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1761    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1762    /// batch.sort().all_ticks()
1763    /// # }, |mut stream| async move {
1764    /// // 1, 2, 3, 4
1765    /// # for w in (1..5) {
1766    /// #     assert_eq!(stream.next().await.unwrap(), w);
1767    /// # }
1768    /// # }));
1769    /// ```
1770    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1771    where
1772        T: Ord,
1773    {
1774        Stream::new(
1775            self.location.clone(),
1776            HydroNode::Sort {
1777                input: Box::new(self.ir_node.into_inner()),
1778                metadata: self
1779                    .location
1780                    .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
1781            },
1782        )
1783    }
1784
1785    /// Produces a new stream that first emits the elements of the `self` stream,
1786    /// and then emits the elements of the `other` stream. The output stream has
1787    /// a [`TotalOrder`] guarantee if and only if both input streams have a
1788    /// [`TotalOrder`] guarantee.
1789    ///
1790    /// Currently, both input streams must be [`Bounded`]. This operator will block
1791    /// on the first stream until all its elements are available. In a future version,
1792    /// we will relax the requirement on the `other` stream.
1793    ///
1794    /// # Example
1795    /// ```rust
1796    /// # use hydro_lang::prelude::*;
1797    /// # use futures::StreamExt;
1798    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1799    /// let tick = process.tick();
1800    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1801    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1802    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1803    /// # }, |mut stream| async move {
1804    /// // 2, 3, 4, 5, 1, 2, 3, 4
1805    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1806    /// #     assert_eq!(stream.next().await.unwrap(), w);
1807    /// # }
1808    /// # }));
1809    /// ```
1810    pub fn chain<O2: Ordering, R2: Retries>(
1811        self,
1812        other: Stream<T, L, Bounded, O2, R2>,
1813    ) -> Stream<T, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1814    where
1815        O: MinOrder<O2>,
1816        R: MinRetries<R2>,
1817    {
1818        check_matching_location(&self.location, &other.location);
1819
1820        Stream::new(
1821            self.location.clone(),
1822            HydroNode::Chain {
1823                first: Box::new(self.ir_node.into_inner()),
1824                second: Box::new(other.ir_node.into_inner()),
1825                metadata: self.location.new_node_metadata(Stream::<
1826                    T,
1827                    L,
1828                    Bounded,
1829                    <O as MinOrder<O2>>::Min,
1830                    <R as MinRetries<R2>>::Min,
1831                >::collection_kind()),
1832            },
1833        )
1834    }
1835
1836    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
1837    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
1838    /// because this is compiled into a nested loop.
1839    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
1840        self,
1841        other: Stream<T2, L, Bounded, O2, R>,
1842    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
1843    where
1844        T: Clone,
1845        T2: Clone,
1846    {
1847        check_matching_location(&self.location, &other.location);
1848
1849        Stream::new(
1850            self.location.clone(),
1851            HydroNode::CrossProduct {
1852                left: Box::new(self.ir_node.into_inner()),
1853                right: Box::new(other.ir_node.into_inner()),
1854                metadata: self.location.new_node_metadata(Stream::<
1855                    (T, T2),
1856                    L,
1857                    Bounded,
1858                    <O2 as MinOrder<O>>::Min,
1859                    R,
1860                >::collection_kind()),
1861            },
1862        )
1863    }
1864
1865    /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
1866    /// `self` used as the values for *each* key.
1867    ///
1868    /// This is helpful when "broadcasting" a set of values so that all the keys have the same
1869    /// values. For example, it can be used to send the same set of elements to several cluster
1870    /// members, if the membership information is available as a [`KeyedSingleton`].
1871    ///
1872    /// # Example
1873    /// ```rust
1874    /// # use hydro_lang::prelude::*;
1875    /// # use futures::StreamExt;
1876    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1877    /// # let tick = process.tick();
1878    /// let keyed_singleton = // { 1: (), 2: () }
1879    /// # process
1880    /// #     .source_iter(q!(vec![(1, ()), (2, ())]))
1881    /// #     .into_keyed()
1882    /// #     .batch(&tick, nondet!(/** test */))
1883    /// #     .first();
1884    /// let stream = // [ "a", "b" ]
1885    /// # process
1886    /// #     .source_iter(q!(vec!["a".to_string(), "b".to_string()]))
1887    /// #     .batch(&tick, nondet!(/** test */));
1888    /// stream.repeat_with_keys(keyed_singleton)
1889    /// # .entries().all_ticks()
1890    /// # }, |mut stream| async move {
1891    /// // { 1: ["a", "b" ], 2: ["a", "b"] }
1892    /// # let mut results = Vec::new();
1893    /// # for _ in 0..4 {
1894    /// #     results.push(stream.next().await.unwrap());
1895    /// # }
1896    /// # results.sort();
1897    /// # assert_eq!(results, vec![(1, "a".to_string()), (1, "b".to_string()), (2, "a".to_string()), (2, "b".to_string())]);
1898    /// # }));
1899    /// ```
1900    pub fn repeat_with_keys<K, V2>(
1901        self,
1902        keys: KeyedSingleton<K, V2, L, Bounded>,
1903    ) -> KeyedStream<K, T, L, Bounded, O, R>
1904    where
1905        K: Clone,
1906        T: Clone,
1907    {
1908        keys.keys()
1909            .weaken_retries()
1910            .assume_ordering_trusted::<TotalOrder>(
1911                nondet!(/** keyed stream does not depend on ordering of keys */),
1912            )
1913            .cross_product_nested_loop(self)
1914            .into_keyed()
1915    }
1916}
1917
1918impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
1919where
1920    L: Location<'a>,
1921{
1922    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
1923    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
1924    /// by equi-joining the two streams on the key attribute `K`.
1925    ///
1926    /// # Example
1927    /// ```rust
1928    /// # use hydro_lang::prelude::*;
1929    /// # use std::collections::HashSet;
1930    /// # use futures::StreamExt;
1931    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1932    /// let tick = process.tick();
1933    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
1934    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
1935    /// stream1.join(stream2)
1936    /// # }, |mut stream| async move {
1937    /// // (1, ('a', 'x')), (2, ('b', 'y'))
1938    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
1939    /// # stream.map(|i| assert!(expected.contains(&i)));
1940    /// # }));
1941    pub fn join<V2, O2: Ordering, R2: Retries>(
1942        self,
1943        n: Stream<(K, V2), L, B, O2, R2>,
1944    ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
1945    where
1946        K: Eq + Hash,
1947        R: MinRetries<R2>,
1948    {
1949        check_matching_location(&self.location, &n.location);
1950
1951        Stream::new(
1952            self.location.clone(),
1953            HydroNode::Join {
1954                left: Box::new(self.ir_node.into_inner()),
1955                right: Box::new(n.ir_node.into_inner()),
1956                metadata: self.location.new_node_metadata(Stream::<
1957                    (K, (V1, V2)),
1958                    L,
1959                    B,
1960                    NoOrder,
1961                    <R as MinRetries<R2>>::Min,
1962                >::collection_kind()),
1963            },
1964        )
1965    }
1966
1967    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
1968    /// computes the anti-join of the items in the input -- i.e. returns
1969    /// unique items in the first input that do not have a matching key
1970    /// in the second input.
1971    ///
1972    /// # Example
1973    /// ```rust
1974    /// # use hydro_lang::prelude::*;
1975    /// # use futures::StreamExt;
1976    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1977    /// let tick = process.tick();
1978    /// let stream = process
1979    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1980    ///   .batch(&tick, nondet!(/** test */));
1981    /// let batch = process
1982    ///   .source_iter(q!(vec![1, 2]))
1983    ///   .batch(&tick, nondet!(/** test */));
1984    /// stream.anti_join(batch).all_ticks()
1985    /// # }, |mut stream| async move {
1986    /// # for w in vec![(3, 'c'), (4, 'd')] {
1987    /// #     assert_eq!(stream.next().await.unwrap(), w);
1988    /// # }
1989    /// # }));
1990    pub fn anti_join<O2: Ordering, R2: Retries>(
1991        self,
1992        n: Stream<K, L, Bounded, O2, R2>,
1993    ) -> Stream<(K, V1), L, B, O, R>
1994    where
1995        K: Eq + Hash,
1996    {
1997        check_matching_location(&self.location, &n.location);
1998
1999        Stream::new(
2000            self.location.clone(),
2001            HydroNode::AntiJoin {
2002                pos: Box::new(self.ir_node.into_inner()),
2003                neg: Box::new(n.ir_node.into_inner()),
2004                metadata: self
2005                    .location
2006                    .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2007            },
2008        )
2009    }
2010}
2011
2012impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2013    Stream<(K, V), L, B, O, R>
2014{
2015    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2016    /// is used as the key and the second element is added to the entries associated with that key.
2017    ///
2018    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2019    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2020    /// performing grouped aggregations, but also for more precise ordering guarantees such as
2021    /// total ordering _within_ each group but no ordering _across_ groups.
2022    ///
2023    /// # Example
2024    /// ```rust
2025    /// # use hydro_lang::prelude::*;
2026    /// # use futures::StreamExt;
2027    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2028    /// process
2029    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2030    ///     .into_keyed()
2031    /// #   .entries()
2032    /// # }, |mut stream| async move {
2033    /// // { 1: [2, 3], 2: [4] }
2034    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2035    /// #     assert_eq!(stream.next().await.unwrap(), w);
2036    /// # }
2037    /// # }));
2038    /// ```
2039    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2040        KeyedStream::new(
2041            self.location.clone(),
2042            HydroNode::Cast {
2043                inner: Box::new(self.ir_node.into_inner()),
2044                metadata: self
2045                    .location
2046                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2047            },
2048        )
2049    }
2050}
2051
2052impl<'a, K, V, L> Stream<(K, V), Tick<L>, Bounded, TotalOrder, ExactlyOnce>
2053where
2054    K: Eq + Hash,
2055    L: Location<'a>,
2056{
2057    #[deprecated = "use .into_keyed().fold(...) instead"]
2058    /// A special case of [`Stream::fold`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2059    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2060    /// in the second element are accumulated via the `comb` closure.
2061    ///
2062    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
2063    /// to depend on the order of elements in the stream.
2064    ///
2065    /// If the input and output value types are the same and do not require initialization then use
2066    /// [`Stream::reduce_keyed`].
2067    ///
2068    /// # Example
2069    /// ```rust
2070    /// # use hydro_lang::prelude::*;
2071    /// # use futures::StreamExt;
2072    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2073    /// let tick = process.tick();
2074    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2075    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2076    /// batch
2077    ///     .fold_keyed(q!(|| 0), q!(|acc, x| *acc += x))
2078    ///     .all_ticks()
2079    /// # }, |mut stream| async move {
2080    /// // (1, 5), (2, 7)
2081    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2082    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2083    /// # }));
2084    /// ```
2085    pub fn fold_keyed<A, I, F>(
2086        self,
2087        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2088        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2089    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2090    where
2091        I: Fn() -> A + 'a,
2092        F: Fn(&mut A, V) + 'a,
2093    {
2094        self.into_keyed().fold(init, comb).entries()
2095    }
2096
2097    #[deprecated = "use .into_keyed().reduce(...) instead"]
2098    /// A special case of [`Stream::reduce`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2099    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2100    /// in the second element are accumulated via the `comb` closure.
2101    ///
2102    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
2103    /// to depend on the order of elements in the stream.
2104    ///
2105    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed`].
2106    ///
2107    /// # Example
2108    /// ```rust
2109    /// # use hydro_lang::prelude::*;
2110    /// # use futures::StreamExt;
2111    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2112    /// let tick = process.tick();
2113    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2114    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2115    /// batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks()
2116    /// # }, |mut stream| async move {
2117    /// // (1, 5), (2, 7)
2118    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2119    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2120    /// # }));
2121    /// ```
2122    pub fn reduce_keyed<F>(
2123        self,
2124        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2125    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2126    where
2127        F: Fn(&mut V, V) + 'a,
2128    {
2129        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
2130
2131        Stream::new(
2132            self.location.clone(),
2133            HydroNode::ReduceKeyed {
2134                f,
2135                input: Box::new(self.ir_node.into_inner()),
2136                metadata: self.location.new_node_metadata(Stream::<
2137                    (K, V),
2138                    Tick<L>,
2139                    Bounded,
2140                    NoOrder,
2141                    ExactlyOnce,
2142                >::collection_kind()),
2143            },
2144        )
2145    }
2146}
2147
2148impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2149where
2150    K: Eq + Hash,
2151    L: Location<'a>,
2152{
2153    #[deprecated = "use .into_keyed().fold_commutative_idempotent(...) instead"]
2154    /// A special case of [`Stream::fold_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2155    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2156    /// in the second element are accumulated via the `comb` closure.
2157    ///
2158    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2159    /// as there may be non-deterministic duplicates.
2160    ///
2161    /// If the input and output value types are the same and do not require initialization then use
2162    /// [`Stream::reduce_keyed_commutative_idempotent`].
2163    ///
2164    /// # Example
2165    /// ```rust
2166    /// # use hydro_lang::prelude::*;
2167    /// # use futures::StreamExt;
2168    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2169    /// let tick = process.tick();
2170    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2171    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2172    /// batch
2173    ///     .fold_keyed_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2174    ///     .all_ticks()
2175    /// # }, |mut stream| async move {
2176    /// // (1, false), (2, true)
2177    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2178    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2179    /// # }));
2180    /// ```
2181    pub fn fold_keyed_commutative_idempotent<A, I, F>(
2182        self,
2183        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2184        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2185    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2186    where
2187        I: Fn() -> A + 'a,
2188        F: Fn(&mut A, V) + 'a,
2189    {
2190        self.into_keyed()
2191            .fold_commutative_idempotent(init, comb)
2192            .entries()
2193    }
2194
2195    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2196    /// # Example
2197    /// ```rust
2198    /// # use hydro_lang::prelude::*;
2199    /// # use futures::StreamExt;
2200    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2201    /// let tick = process.tick();
2202    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2203    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2204    /// batch.keys().all_ticks()
2205    /// # }, |mut stream| async move {
2206    /// // 1, 2
2207    /// # assert_eq!(stream.next().await.unwrap(), 1);
2208    /// # assert_eq!(stream.next().await.unwrap(), 2);
2209    /// # }));
2210    /// ```
2211    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2212        self.into_keyed()
2213            .fold_commutative_idempotent(q!(|| ()), q!(|_, _| {}))
2214            .keys()
2215    }
2216
2217    #[deprecated = "use .into_keyed().reduce_commutative_idempotent(...) instead"]
2218    /// A special case of [`Stream::reduce_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2219    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2220    /// in the second element are accumulated via the `comb` closure.
2221    ///
2222    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2223    /// as there may be non-deterministic duplicates.
2224    ///
2225    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative_idempotent`].
2226    ///
2227    /// # Example
2228    /// ```rust
2229    /// # use hydro_lang::prelude::*;
2230    /// # use futures::StreamExt;
2231    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2232    /// let tick = process.tick();
2233    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2234    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2235    /// batch
2236    ///     .reduce_keyed_commutative_idempotent(q!(|acc, x| *acc |= x))
2237    ///     .all_ticks()
2238    /// # }, |mut stream| async move {
2239    /// // (1, false), (2, true)
2240    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2241    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2242    /// # }));
2243    /// ```
2244    pub fn reduce_keyed_commutative_idempotent<F>(
2245        self,
2246        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2247    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2248    where
2249        F: Fn(&mut V, V) + 'a,
2250    {
2251        self.into_keyed()
2252            .reduce_commutative_idempotent(comb)
2253            .entries()
2254    }
2255}
2256
2257impl<'a, K, V, L, O: Ordering> Stream<(K, V), Tick<L>, Bounded, O, ExactlyOnce>
2258where
2259    K: Eq + Hash,
2260    L: Location<'a>,
2261{
2262    #[deprecated = "use .into_keyed().fold_commutative(...) instead"]
2263    /// A special case of [`Stream::fold_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2264    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2265    /// in the second element are accumulated via the `comb` closure.
2266    ///
2267    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2268    ///
2269    /// If the input and output value types are the same and do not require initialization then use
2270    /// [`Stream::reduce_keyed_commutative`].
2271    ///
2272    /// # Example
2273    /// ```rust
2274    /// # use hydro_lang::prelude::*;
2275    /// # use futures::StreamExt;
2276    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2277    /// let tick = process.tick();
2278    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2279    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2280    /// batch
2281    ///     .fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x))
2282    ///     .all_ticks()
2283    /// # }, |mut stream| async move {
2284    /// // (1, 5), (2, 7)
2285    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2286    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2287    /// # }));
2288    /// ```
2289    pub fn fold_keyed_commutative<A, I, F>(
2290        self,
2291        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2292        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2293    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2294    where
2295        I: Fn() -> A + 'a,
2296        F: Fn(&mut A, V) + 'a,
2297    {
2298        self.into_keyed().fold_commutative(init, comb).entries()
2299    }
2300
2301    #[deprecated = "use .into_keyed().reduce_commutative(...) instead"]
2302    /// A special case of [`Stream::reduce_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2303    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2304    /// in the second element are accumulated via the `comb` closure.
2305    ///
2306    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2307    ///
2308    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative`].
2309    ///
2310    /// # Example
2311    /// ```rust
2312    /// # use hydro_lang::prelude::*;
2313    /// # use futures::StreamExt;
2314    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2315    /// let tick = process.tick();
2316    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2317    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2318    /// batch
2319    ///     .reduce_keyed_commutative(q!(|acc, x| *acc += x))
2320    ///     .all_ticks()
2321    /// # }, |mut stream| async move {
2322    /// // (1, 5), (2, 7)
2323    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2324    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2325    /// # }));
2326    /// ```
2327    pub fn reduce_keyed_commutative<F>(
2328        self,
2329        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2330    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2331    where
2332        F: Fn(&mut V, V) + 'a,
2333    {
2334        self.into_keyed().reduce_commutative(comb).entries()
2335    }
2336}
2337
2338impl<'a, K, V, L, R: Retries> Stream<(K, V), Tick<L>, Bounded, TotalOrder, R>
2339where
2340    K: Eq + Hash,
2341    L: Location<'a>,
2342{
2343    #[deprecated = "use .into_keyed().fold_idempotent(...) instead"]
2344    /// A special case of [`Stream::fold_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2345    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2346    /// in the second element are accumulated via the `comb` closure.
2347    ///
2348    /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
2349    ///
2350    /// If the input and output value types are the same and do not require initialization then use
2351    /// [`Stream::reduce_keyed_idempotent`].
2352    ///
2353    /// # Example
2354    /// ```rust
2355    /// # use hydro_lang::prelude::*;
2356    /// # use futures::StreamExt;
2357    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2358    /// let tick = process.tick();
2359    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2360    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2361    /// batch
2362    ///     .fold_keyed_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2363    ///     .all_ticks()
2364    /// # }, |mut stream| async move {
2365    /// // (1, false), (2, true)
2366    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2367    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2368    /// # }));
2369    /// ```
2370    pub fn fold_keyed_idempotent<A, I, F>(
2371        self,
2372        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2373        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2374    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2375    where
2376        I: Fn() -> A + 'a,
2377        F: Fn(&mut A, V) + 'a,
2378    {
2379        self.into_keyed().fold_idempotent(init, comb).entries()
2380    }
2381
2382    #[deprecated = "use .into_keyed().reduce_idempotent(...) instead"]
2383    /// A special case of [`Stream::reduce_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2384    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2385    /// in the second element are accumulated via the `comb` closure.
2386    ///
2387    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
2388    ///
2389    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_idempotent`].
2390    ///
2391    /// # Example
2392    /// ```rust
2393    /// # use hydro_lang::prelude::*;
2394    /// # use futures::StreamExt;
2395    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2396    /// let tick = process.tick();
2397    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2398    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2399    /// batch
2400    ///     .reduce_keyed_idempotent(q!(|acc, x| *acc |= x))
2401    ///     .all_ticks()
2402    /// # }, |mut stream| async move {
2403    /// // (1, false), (2, true)
2404    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2405    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2406    /// # }));
2407    /// ```
2408    pub fn reduce_keyed_idempotent<F>(
2409        self,
2410        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2411    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2412    where
2413        F: Fn(&mut V, V) + 'a,
2414    {
2415        self.into_keyed().reduce_idempotent(comb).entries()
2416    }
2417}
2418
2419impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2420where
2421    L: Location<'a> + NoTick,
2422{
2423    /// Returns a stream corresponding to the latest batch of elements being atomically
2424    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2425    /// the order of the input.
2426    ///
2427    /// # Non-Determinism
2428    /// The batch boundaries are non-deterministic and may change across executions.
2429    pub fn batch_atomic(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2430        Stream::new(
2431            self.location.clone().tick,
2432            HydroNode::Batch {
2433                inner: Box::new(self.ir_node.into_inner()),
2434                metadata: self
2435                    .location
2436                    .tick
2437                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2438            },
2439        )
2440    }
2441
2442    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2443    /// See [`Stream::atomic`] for more details.
2444    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2445        Stream::new(
2446            self.location.tick.l.clone(),
2447            HydroNode::EndAtomic {
2448                inner: Box::new(self.ir_node.into_inner()),
2449                metadata: self
2450                    .location
2451                    .tick
2452                    .l
2453                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2454            },
2455        )
2456    }
2457}
2458
2459impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2460where
2461    L: Location<'a>,
2462{
2463    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
2464    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
2465    ///
2466    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2467    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2468    /// argument that declares where the stream will be atomically processed. Batching a stream into
2469    /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2470    /// [`Tick`] will introduce asynchrony.
2471    pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
2472        let out_location = Atomic { tick: tick.clone() };
2473        Stream::new(
2474            out_location.clone(),
2475            HydroNode::BeginAtomic {
2476                inner: Box::new(self.ir_node.into_inner()),
2477                metadata: out_location
2478                    .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
2479            },
2480        )
2481    }
2482
2483    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2484    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2485    /// the order of the input. The output stream will execute in the [`Tick`] that was
2486    /// used to create the atomic section.
2487    ///
2488    /// # Non-Determinism
2489    /// The batch boundaries are non-deterministic and may change across executions.
2490    pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2491        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2492        Stream::new(
2493            tick.clone(),
2494            HydroNode::Batch {
2495                inner: Box::new(self.ir_node.into_inner()),
2496                metadata: tick
2497                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2498            },
2499        )
2500    }
2501
2502    /// Given a time interval, returns a stream corresponding to samples taken from the
2503    /// stream roughly at that interval. The output will have elements in the same order
2504    /// as the input, but with arbitrary elements skipped between samples. There is also
2505    /// no guarantee on the exact timing of the samples.
2506    ///
2507    /// # Non-Determinism
2508    /// The output stream is non-deterministic in which elements are sampled, since this
2509    /// is controlled by a clock.
2510    pub fn sample_every(
2511        self,
2512        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
2513        nondet: NonDet,
2514    ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
2515    where
2516        L: NoTick + NoAtomic,
2517    {
2518        let samples = self.location.source_interval(interval, nondet);
2519
2520        let tick = self.location.tick();
2521        self.batch(&tick, nondet)
2522            .filter_if_some(samples.batch(&tick, nondet).first())
2523            .all_ticks()
2524            .weakest_retries()
2525    }
2526
2527    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
2528    /// stream has not emitted a value since that duration.
2529    ///
2530    /// # Non-Determinism
2531    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
2532    /// samples take place, timeouts may be non-deterministically generated or missed,
2533    /// and the notification of the timeout may be delayed as well. There is also no
2534    /// guarantee on how long the [`Optional`] will have a value after the timeout is
2535    /// detected based on when the next sample is taken.
2536    pub fn timeout(
2537        self,
2538        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
2539        nondet: NonDet,
2540    ) -> Optional<(), L, Unbounded>
2541    where
2542        L: NoTick + NoAtomic,
2543    {
2544        let tick = self.location.tick();
2545
2546        let latest_received = self.assume_retries(nondet).fold_commutative(
2547            q!(|| None),
2548            q!(|latest, _| {
2549                *latest = Some(Instant::now());
2550            }),
2551        );
2552
2553        latest_received
2554            .snapshot(&tick, nondet)
2555            .filter_map(q!(move |latest_received| {
2556                if let Some(latest_received) = latest_received {
2557                    if Instant::now().duration_since(latest_received) > duration {
2558                        Some(())
2559                    } else {
2560                        None
2561                    }
2562                } else {
2563                    Some(())
2564                }
2565            }))
2566            .latest()
2567    }
2568}
2569
2570impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2571where
2572    L: Location<'a> + NoTick + NoAtomic,
2573    F: Future<Output = T>,
2574{
2575    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2576    /// Future outputs are produced as available, regardless of input arrival order.
2577    ///
2578    /// # Example
2579    /// ```rust
2580    /// # use std::collections::HashSet;
2581    /// # use futures::StreamExt;
2582    /// # use hydro_lang::prelude::*;
2583    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2584    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2585    ///     .map(q!(|x| async move {
2586    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2587    ///         x
2588    ///     }))
2589    ///     .resolve_futures()
2590    /// #   },
2591    /// #   |mut stream| async move {
2592    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2593    /// #       let mut output = HashSet::new();
2594    /// #       for _ in 1..10 {
2595    /// #           output.insert(stream.next().await.unwrap());
2596    /// #       }
2597    /// #       assert_eq!(
2598    /// #           output,
2599    /// #           HashSet::<i32>::from_iter(1..10)
2600    /// #       );
2601    /// #   },
2602    /// # ));
2603    pub fn resolve_futures(self) -> Stream<T, L, B, NoOrder, R> {
2604        Stream::new(
2605            self.location.clone(),
2606            HydroNode::ResolveFutures {
2607                input: Box::new(self.ir_node.into_inner()),
2608                metadata: self
2609                    .location
2610                    .new_node_metadata(Stream::<T, L, B, NoOrder, R>::collection_kind()),
2611            },
2612        )
2613    }
2614
2615    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2616    /// Future outputs are produced in the same order as the input stream.
2617    ///
2618    /// # Example
2619    /// ```rust
2620    /// # use std::collections::HashSet;
2621    /// # use futures::StreamExt;
2622    /// # use hydro_lang::prelude::*;
2623    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2624    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2625    ///     .map(q!(|x| async move {
2626    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2627    ///         x
2628    ///     }))
2629    ///     .resolve_futures_ordered()
2630    /// #   },
2631    /// #   |mut stream| async move {
2632    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2633    /// #       let mut output = Vec::new();
2634    /// #       for _ in 1..10 {
2635    /// #           output.push(stream.next().await.unwrap());
2636    /// #       }
2637    /// #       assert_eq!(
2638    /// #           output,
2639    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2640    /// #       );
2641    /// #   },
2642    /// # ));
2643    pub fn resolve_futures_ordered(self) -> Stream<T, L, B, O, R> {
2644        Stream::new(
2645            self.location.clone(),
2646            HydroNode::ResolveFuturesOrdered {
2647                input: Box::new(self.ir_node.into_inner()),
2648                metadata: self
2649                    .location
2650                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2651            },
2652        )
2653    }
2654}
2655
2656impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
2657where
2658    L: Location<'a> + NoTick,
2659{
2660    /// Executes the provided closure for every element in this stream.
2661    ///
2662    /// Because the closure may have side effects, the stream must have deterministic order
2663    /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
2664    /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
2665    /// [`Stream::assume_retries`] with an explanation for why this is the case.
2666    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
2667        let f = f.splice_fn1_ctx(&self.location).into();
2668        self.location
2669            .flow_state()
2670            .borrow_mut()
2671            .push_root(HydroRoot::ForEach {
2672                input: Box::new(self.ir_node.into_inner()),
2673                f,
2674                op_metadata: HydroIrOpMetadata::new(),
2675            });
2676    }
2677
2678    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
2679    /// TCP socket to some other server. You should _not_ use this API for interacting with
2680    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
2681    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
2682    /// interaction with asynchronous sinks.
2683    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
2684    where
2685        S: 'a + futures::Sink<T> + Unpin,
2686    {
2687        self.location
2688            .flow_state()
2689            .borrow_mut()
2690            .push_root(HydroRoot::DestSink {
2691                sink: sink.splice_typed_ctx(&self.location).into(),
2692                input: Box::new(self.ir_node.into_inner()),
2693                op_metadata: HydroIrOpMetadata::new(),
2694            });
2695    }
2696}
2697
2698impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2699where
2700    L: Location<'a>,
2701{
2702    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2703    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2704    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2705        Stream::new(
2706            self.location.outer().clone(),
2707            HydroNode::YieldConcat {
2708                inner: Box::new(self.ir_node.into_inner()),
2709                metadata: self
2710                    .location
2711                    .outer()
2712                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2713            },
2714        )
2715    }
2716
2717    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2718    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2719    ///
2720    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2721    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2722    /// stream's [`Tick`] context.
2723    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2724        let out_location = Atomic {
2725            tick: self.location.clone(),
2726        };
2727
2728        Stream::new(
2729            out_location.clone(),
2730            HydroNode::YieldConcat {
2731                inner: Box::new(self.ir_node.into_inner()),
2732                metadata: out_location
2733                    .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2734            },
2735        )
2736    }
2737
2738    /// Accumulates the elements of this stream **across ticks** by concatenating them together.
2739    ///
2740    /// The output stream in tick T will contain the elements of the input at tick 0, 1, ..., up to
2741    /// and including tick T. This is useful for accumulating streaming inputs across ticks, but be
2742    /// careful when using this operator, as its memory usage will grow linearly over time since it
2743    /// must store its inputs indefinitely.
2744    ///
2745    /// # Example
2746    /// ```rust
2747    /// # use hydro_lang::prelude::*;
2748    /// # use futures::StreamExt;
2749    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2750    /// let tick = process.tick();
2751    /// // ticks are lazy by default, forces the second tick to run
2752    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2753    ///
2754    /// let batch_first_tick = process
2755    ///   .source_iter(q!(vec![1, 2, 3, 4]))
2756    ///   .batch(&tick, nondet!(/** test */));
2757    /// let batch_second_tick = process
2758    ///   .source_iter(q!(vec![5, 6, 7, 8]))
2759    ///   .batch(&tick, nondet!(/** test */))
2760    ///   .defer_tick(); // appears on the second tick
2761    /// batch_first_tick.chain(batch_second_tick)
2762    ///   .persist()
2763    ///   .all_ticks()
2764    /// # }, |mut stream| async move {
2765    /// // [1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, ...]
2766    /// # for w in vec![1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8] {
2767    /// #     assert_eq!(stream.next().await.unwrap(), w);
2768    /// # }
2769    /// # }));
2770    /// ```
2771    pub fn persist(self) -> Stream<T, Tick<L>, Bounded, O, R>
2772    where
2773        T: Clone,
2774    {
2775        Stream::new(
2776            self.location.clone(),
2777            HydroNode::Persist {
2778                inner: Box::new(self.ir_node.into_inner()),
2779                metadata: self
2780                    .location
2781                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2782            },
2783        )
2784    }
2785
2786    /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2787    /// always has the elements of `self` at tick `T - 1`.
2788    ///
2789    /// At tick `0`, the output stream is empty, since there is no previous tick.
2790    ///
2791    /// This operator enables stateful iterative processing with ticks, by sending data from one
2792    /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2793    ///
2794    /// # Example
2795    /// ```rust
2796    /// # use hydro_lang::prelude::*;
2797    /// # use futures::StreamExt;
2798    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2799    /// let tick = process.tick();
2800    /// // ticks are lazy by default, forces the second tick to run
2801    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2802    ///
2803    /// let batch_first_tick = process
2804    ///   .source_iter(q!(vec![1, 2, 3, 4]))
2805    ///   .batch(&tick, nondet!(/** test */));
2806    /// let batch_second_tick = process
2807    ///   .source_iter(q!(vec![0, 3, 4, 5, 6]))
2808    ///   .batch(&tick, nondet!(/** test */))
2809    ///   .defer_tick(); // appears on the second tick
2810    /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2811    ///
2812    /// changes_across_ticks.clone().filter_not_in(
2813    ///     changes_across_ticks.defer_tick() // the elements from the previous tick
2814    /// ).all_ticks()
2815    /// # }, |mut stream| async move {
2816    /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2817    /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2818    /// #     assert_eq!(stream.next().await.unwrap(), w);
2819    /// # }
2820    /// # }));
2821    /// ```
2822    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2823        Stream::new(
2824            self.location.clone(),
2825            HydroNode::DeferTick {
2826                input: Box::new(self.ir_node.into_inner()),
2827                metadata: self
2828                    .location
2829                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2830            },
2831        )
2832    }
2833}
2834
2835#[cfg(test)]
2836mod tests {
2837    #[cfg(feature = "deploy")]
2838    use futures::{SinkExt, StreamExt};
2839    #[cfg(feature = "deploy")]
2840    use hydro_deploy::Deployment;
2841    #[cfg(feature = "deploy")]
2842    use serde::{Deserialize, Serialize};
2843    #[cfg(feature = "deploy")]
2844    use stageleft::q;
2845
2846    use crate::compile::builder::FlowBuilder;
2847    #[cfg(feature = "deploy")]
2848    use crate::live_collections::stream::ExactlyOnce;
2849    use crate::live_collections::stream::{NoOrder, TotalOrder};
2850    use crate::location::Location;
2851    use crate::nondet::nondet;
2852
2853    mod backtrace_chained_ops;
2854
2855    #[cfg(feature = "deploy")]
2856    struct P1 {}
2857    #[cfg(feature = "deploy")]
2858    struct P2 {}
2859
2860    #[cfg(feature = "deploy")]
2861    #[derive(Serialize, Deserialize, Debug)]
2862    struct SendOverNetwork {
2863        n: u32,
2864    }
2865
2866    #[cfg(feature = "deploy")]
2867    #[tokio::test]
2868    async fn first_ten_distributed() {
2869        let mut deployment = Deployment::new();
2870
2871        let flow = FlowBuilder::new();
2872        let first_node = flow.process::<P1>();
2873        let second_node = flow.process::<P2>();
2874        let external = flow.external::<P2>();
2875
2876        let numbers = first_node.source_iter(q!(0..10));
2877        let out_port = numbers
2878            .map(q!(|n| SendOverNetwork { n }))
2879            .send_bincode(&second_node)
2880            .send_bincode_external(&external);
2881
2882        let nodes = flow
2883            .with_process(&first_node, deployment.Localhost())
2884            .with_process(&second_node, deployment.Localhost())
2885            .with_external(&external, deployment.Localhost())
2886            .deploy(&mut deployment);
2887
2888        deployment.deploy().await.unwrap();
2889
2890        let mut external_out = nodes.connect(out_port).await;
2891
2892        deployment.start().await.unwrap();
2893
2894        for i in 0..10 {
2895            assert_eq!(external_out.next().await.unwrap().n, i);
2896        }
2897    }
2898
2899    #[cfg(feature = "deploy")]
2900    #[tokio::test]
2901    async fn first_cardinality() {
2902        let mut deployment = Deployment::new();
2903
2904        let flow = FlowBuilder::new();
2905        let node = flow.process::<()>();
2906        let external = flow.external::<()>();
2907
2908        let node_tick = node.tick();
2909        let count = node_tick
2910            .singleton(q!([1, 2, 3]))
2911            .into_stream()
2912            .flatten_ordered()
2913            .first()
2914            .into_stream()
2915            .count()
2916            .all_ticks()
2917            .send_bincode_external(&external);
2918
2919        let nodes = flow
2920            .with_process(&node, deployment.Localhost())
2921            .with_external(&external, deployment.Localhost())
2922            .deploy(&mut deployment);
2923
2924        deployment.deploy().await.unwrap();
2925
2926        let mut external_out = nodes.connect(count).await;
2927
2928        deployment.start().await.unwrap();
2929
2930        assert_eq!(external_out.next().await.unwrap(), 1);
2931    }
2932
2933    #[cfg(feature = "deploy")]
2934    #[tokio::test]
2935    async fn unbounded_reduce_remembers_state() {
2936        let mut deployment = Deployment::new();
2937
2938        let flow = FlowBuilder::new();
2939        let node = flow.process::<()>();
2940        let external = flow.external::<()>();
2941
2942        let (input_port, input) = node.source_external_bincode(&external);
2943        let out = input
2944            .reduce(q!(|acc, v| *acc += v))
2945            .sample_eager(nondet!(/** test */))
2946            .send_bincode_external(&external);
2947
2948        let nodes = flow
2949            .with_process(&node, deployment.Localhost())
2950            .with_external(&external, deployment.Localhost())
2951            .deploy(&mut deployment);
2952
2953        deployment.deploy().await.unwrap();
2954
2955        let mut external_in = nodes.connect(input_port).await;
2956        let mut external_out = nodes.connect(out).await;
2957
2958        deployment.start().await.unwrap();
2959
2960        external_in.send(1).await.unwrap();
2961        assert_eq!(external_out.next().await.unwrap(), 1);
2962
2963        external_in.send(2).await.unwrap();
2964        assert_eq!(external_out.next().await.unwrap(), 3);
2965    }
2966
2967    #[cfg(feature = "deploy")]
2968    #[tokio::test]
2969    async fn atomic_fold_replays_each_tick() {
2970        let mut deployment = Deployment::new();
2971
2972        let flow = FlowBuilder::new();
2973        let node = flow.process::<()>();
2974        let external = flow.external::<()>();
2975
2976        let (input_port, input) =
2977            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2978        let tick = node.tick();
2979
2980        let out = input
2981            .batch(&tick, nondet!(/** test */))
2982            .cross_singleton(
2983                node.source_iter(q!(vec![1, 2, 3]))
2984                    .atomic(&tick)
2985                    .fold(q!(|| 0), q!(|acc, v| *acc += v))
2986                    .snapshot_atomic(nondet!(/** test */)),
2987            )
2988            .all_ticks()
2989            .send_bincode_external(&external);
2990
2991        let nodes = flow
2992            .with_process(&node, deployment.Localhost())
2993            .with_external(&external, deployment.Localhost())
2994            .deploy(&mut deployment);
2995
2996        deployment.deploy().await.unwrap();
2997
2998        let mut external_in = nodes.connect(input_port).await;
2999        let mut external_out = nodes.connect(out).await;
3000
3001        deployment.start().await.unwrap();
3002
3003        external_in.send(1).await.unwrap();
3004        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3005
3006        external_in.send(2).await.unwrap();
3007        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3008    }
3009
3010    #[cfg(feature = "deploy")]
3011    #[tokio::test]
3012    async fn unbounded_scan_remembers_state() {
3013        let mut deployment = Deployment::new();
3014
3015        let flow = FlowBuilder::new();
3016        let node = flow.process::<()>();
3017        let external = flow.external::<()>();
3018
3019        let (input_port, input) = node.source_external_bincode(&external);
3020        let out = input
3021            .scan(
3022                q!(|| 0),
3023                q!(|acc, v| {
3024                    *acc += v;
3025                    Some(*acc)
3026                }),
3027            )
3028            .send_bincode_external(&external);
3029
3030        let nodes = flow
3031            .with_process(&node, deployment.Localhost())
3032            .with_external(&external, deployment.Localhost())
3033            .deploy(&mut deployment);
3034
3035        deployment.deploy().await.unwrap();
3036
3037        let mut external_in = nodes.connect(input_port).await;
3038        let mut external_out = nodes.connect(out).await;
3039
3040        deployment.start().await.unwrap();
3041
3042        external_in.send(1).await.unwrap();
3043        assert_eq!(external_out.next().await.unwrap(), 1);
3044
3045        external_in.send(2).await.unwrap();
3046        assert_eq!(external_out.next().await.unwrap(), 3);
3047    }
3048
3049    #[cfg(feature = "deploy")]
3050    #[tokio::test]
3051    async fn unbounded_enumerate_remembers_state() {
3052        let mut deployment = Deployment::new();
3053
3054        let flow = FlowBuilder::new();
3055        let node = flow.process::<()>();
3056        let external = flow.external::<()>();
3057
3058        let (input_port, input) = node.source_external_bincode(&external);
3059        let out = input.enumerate().send_bincode_external(&external);
3060
3061        let nodes = flow
3062            .with_process(&node, deployment.Localhost())
3063            .with_external(&external, deployment.Localhost())
3064            .deploy(&mut deployment);
3065
3066        deployment.deploy().await.unwrap();
3067
3068        let mut external_in = nodes.connect(input_port).await;
3069        let mut external_out = nodes.connect(out).await;
3070
3071        deployment.start().await.unwrap();
3072
3073        external_in.send(1).await.unwrap();
3074        assert_eq!(external_out.next().await.unwrap(), (0, 1));
3075
3076        external_in.send(2).await.unwrap();
3077        assert_eq!(external_out.next().await.unwrap(), (1, 2));
3078    }
3079
3080    #[cfg(feature = "deploy")]
3081    #[tokio::test]
3082    async fn unbounded_unique_remembers_state() {
3083        let mut deployment = Deployment::new();
3084
3085        let flow = FlowBuilder::new();
3086        let node = flow.process::<()>();
3087        let external = flow.external::<()>();
3088
3089        let (input_port, input) =
3090            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3091        let out = input.unique().send_bincode_external(&external);
3092
3093        let nodes = flow
3094            .with_process(&node, deployment.Localhost())
3095            .with_external(&external, deployment.Localhost())
3096            .deploy(&mut deployment);
3097
3098        deployment.deploy().await.unwrap();
3099
3100        let mut external_in = nodes.connect(input_port).await;
3101        let mut external_out = nodes.connect(out).await;
3102
3103        deployment.start().await.unwrap();
3104
3105        external_in.send(1).await.unwrap();
3106        assert_eq!(external_out.next().await.unwrap(), 1);
3107
3108        external_in.send(2).await.unwrap();
3109        assert_eq!(external_out.next().await.unwrap(), 2);
3110
3111        external_in.send(1).await.unwrap();
3112        external_in.send(3).await.unwrap();
3113        assert_eq!(external_out.next().await.unwrap(), 3);
3114    }
3115
3116    #[test]
3117    #[should_panic]
3118    fn sim_batch_nondet_size() {
3119        let flow = FlowBuilder::new();
3120        let external = flow.external::<()>();
3121        let node = flow.process::<()>();
3122
3123        let (port, input) = node.source_external_bincode::<_, _, TotalOrder, _>(&external);
3124
3125        let tick = node.tick();
3126        let out_port = input
3127            .batch(&tick, nondet!(/** test */))
3128            .count()
3129            .all_ticks()
3130            .send_bincode_external(&external);
3131
3132        flow.sim().exhaustive(async |mut compiled| {
3133            let in_send = compiled.connect(&port);
3134            let mut out_recv = compiled.connect(&out_port);
3135            compiled.launch();
3136
3137            in_send.send(());
3138            in_send.send(());
3139            in_send.send(());
3140
3141            assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3142        });
3143    }
3144
3145    #[test]
3146    fn sim_batch_preserves_order() {
3147        let flow = FlowBuilder::new();
3148        let external = flow.external::<()>();
3149        let node = flow.process::<()>();
3150
3151        let (port, input) = node.source_external_bincode(&external);
3152
3153        let tick = node.tick();
3154        let out_port = input
3155            .batch(&tick, nondet!(/** test */))
3156            .all_ticks()
3157            .send_bincode_external(&external);
3158
3159        flow.sim().exhaustive(async |mut compiled| {
3160            let in_send = compiled.connect(&port);
3161            let out_recv = compiled.connect(&out_port);
3162            compiled.launch();
3163
3164            in_send.send(1);
3165            in_send.send(2);
3166            in_send.send(3);
3167
3168            out_recv.assert_yields_only([1, 2, 3]).await;
3169        });
3170    }
3171
3172    #[test]
3173    #[should_panic]
3174    fn sim_batch_unordered_shuffles() {
3175        let flow = FlowBuilder::new();
3176        let external = flow.external::<()>();
3177        let node = flow.process::<()>();
3178
3179        let (port, input) = node.source_external_bincode::<_, _, NoOrder, _>(&external);
3180
3181        let tick = node.tick();
3182        let batch = input.batch(&tick, nondet!(/** test */));
3183        let out_port = batch
3184            .clone()
3185            .min()
3186            .zip(batch.max())
3187            .all_ticks()
3188            .send_bincode_external(&external);
3189
3190        flow.sim().exhaustive(async |mut compiled| {
3191            let in_send = compiled.connect(&port);
3192            let out_recv = compiled.connect(&out_port);
3193            compiled.launch();
3194
3195            in_send.send_many_unordered([1, 2, 3]).unwrap();
3196
3197            if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3198                panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3199            }
3200        });
3201    }
3202
3203    #[test]
3204    fn sim_batch_unordered_shuffles_count() {
3205        let flow = FlowBuilder::new();
3206        let external = flow.external::<()>();
3207        let node = flow.process::<()>();
3208
3209        let (port, input) = node.source_external_bincode::<_, _, NoOrder, _>(&external);
3210
3211        let tick = node.tick();
3212        let batch = input.batch(&tick, nondet!(/** test */));
3213        let out_port = batch.all_ticks().send_bincode_external(&external);
3214
3215        let instance_count = flow.sim().exhaustive(async |mut compiled| {
3216            let in_send = compiled.connect(&port);
3217            let out_recv = compiled.connect(&out_port);
3218            compiled.launch();
3219
3220            in_send.send_many_unordered([1, 2, 3, 4]).unwrap();
3221            out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3222        });
3223
3224        assert_eq!(
3225            instance_count,
3226            75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3227        )
3228    }
3229
3230    #[test]
3231    #[ignore = "assume_ordering not yet supported on bounded collections"]
3232    fn sim_observe_order_batched_count() {
3233        let flow = FlowBuilder::new();
3234        let external = flow.external::<()>();
3235        let node = flow.process::<()>();
3236
3237        let (port, input) = node.source_external_bincode::<_, _, NoOrder, _>(&external);
3238
3239        let tick = node.tick();
3240        let batch = input.batch(&tick, nondet!(/** test */));
3241        let out_port = batch
3242            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3243            .all_ticks()
3244            .send_bincode_external(&external);
3245
3246        let instance_count = flow.sim().exhaustive(async |mut compiled| {
3247            let in_send = compiled.connect(&port);
3248            let out_recv = compiled.connect(&out_port);
3249            compiled.launch();
3250
3251            in_send.send_many_unordered([1, 2, 3, 4]).unwrap();
3252            let _ = out_recv.collect::<Vec<_>>().await;
3253        });
3254
3255        assert_eq!(
3256            instance_count,
3257            192 // 4! * 2^{4 - 1}
3258        )
3259    }
3260
3261    #[test]
3262    fn sim_unordered_count_instance_count() {
3263        let flow = FlowBuilder::new();
3264        let external = flow.external::<()>();
3265        let node = flow.process::<()>();
3266
3267        let (port, input) = node.source_external_bincode::<_, _, NoOrder, _>(&external);
3268
3269        let tick = node.tick();
3270        let out_port = input
3271            .count()
3272            .snapshot(&tick, nondet!(/** test */))
3273            .all_ticks()
3274            .send_bincode_external(&external);
3275
3276        let instance_count = flow.sim().exhaustive(async |mut compiled| {
3277            let in_send = compiled.connect(&port);
3278            let out_recv = compiled.connect(&out_port);
3279            compiled.launch();
3280
3281            in_send.send_many_unordered([1, 2, 3, 4]).unwrap();
3282            assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3283        });
3284
3285        assert_eq!(
3286            instance_count,
3287            16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3288        )
3289    }
3290}