hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::KeyedStream;
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::ir::{
19    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
20};
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::forward_handle::{ForwardRef, TickCycle};
24#[cfg(stageleft_runtime)]
25use crate::location::dynamic::{DynLocation, LocationId};
26use crate::location::tick::{Atomic, DeferTick, NoAtomic};
27use crate::location::{Location, NoTick, Tick, check_matching_location};
28use crate::nondet::{NonDet, nondet};
29
30pub mod networking;
31
32/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
33#[sealed::sealed]
34pub trait Ordering:
35    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
36{
37    /// The [`StreamOrder`] corresponding to this type.
38    const ORDERING_KIND: StreamOrder;
39}
40
41/// Marks the stream as being totally ordered, which means that there are
42/// no sources of non-determinism (other than intentional ones) that will
43/// affect the order of elements.
44pub enum TotalOrder {}
45
46#[sealed::sealed]
47impl Ordering for TotalOrder {
48    const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
49}
50
51/// Marks the stream as having no order, which means that the order of
52/// elements may be affected by non-determinism.
53///
54/// This restricts certain operators, such as `fold` and `reduce`, to only
55/// be used with commutative aggregation functions.
56pub enum NoOrder {}
57
58#[sealed::sealed]
59impl Ordering for NoOrder {
60    const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
61}
62
63/// Helper trait for determining the weakest of two orderings.
64#[sealed::sealed]
65pub trait MinOrder<Other: ?Sized> {
66    /// The weaker of the two orderings.
67    type Min: Ordering;
68}
69
70#[sealed::sealed]
71impl MinOrder<NoOrder> for TotalOrder {
72    type Min = NoOrder;
73}
74
75#[sealed::sealed]
76impl MinOrder<TotalOrder> for TotalOrder {
77    type Min = TotalOrder;
78}
79
80#[sealed::sealed]
81impl MinOrder<TotalOrder> for NoOrder {
82    type Min = NoOrder;
83}
84
85#[sealed::sealed]
86impl MinOrder<NoOrder> for NoOrder {
87    type Min = NoOrder;
88}
89
90/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
91#[sealed::sealed]
92pub trait Retries:
93    MinRetries<Self, Min = Self>
94    + MinRetries<ExactlyOnce, Min = Self>
95    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
96{
97    /// The [`StreamRetry`] corresponding to this type.
98    const RETRIES_KIND: StreamRetry;
99}
100
101/// Marks the stream as having deterministic message cardinality, with no
102/// possibility of duplicates.
103pub enum ExactlyOnce {}
104
105#[sealed::sealed]
106impl Retries for ExactlyOnce {
107    const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
108}
109
110/// Marks the stream as having non-deterministic message cardinality, which
111/// means that duplicates may occur, but messages will not be dropped.
112pub enum AtLeastOnce {}
113
114#[sealed::sealed]
115impl Retries for AtLeastOnce {
116    const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
117}
118
119/// Helper trait for determining the weakest of two retry guarantees.
120#[sealed::sealed]
121pub trait MinRetries<Other: ?Sized> {
122    /// The weaker of the two retry guarantees.
123    type Min: Retries;
124}
125
126#[sealed::sealed]
127impl MinRetries<AtLeastOnce> for ExactlyOnce {
128    type Min = AtLeastOnce;
129}
130
131#[sealed::sealed]
132impl MinRetries<ExactlyOnce> for ExactlyOnce {
133    type Min = ExactlyOnce;
134}
135
136#[sealed::sealed]
137impl MinRetries<ExactlyOnce> for AtLeastOnce {
138    type Min = AtLeastOnce;
139}
140
141#[sealed::sealed]
142impl MinRetries<AtLeastOnce> for AtLeastOnce {
143    type Min = AtLeastOnce;
144}
145
146/// Streaming sequence of elements with type `Type`.
147///
148/// This live collection represents a growing sequence of elements, with new elements being
149/// asynchronously appended to the end of the sequence. This can be used to model the arrival
150/// of network input, such as API requests, or streaming ingestion.
151///
152/// By default, all streams have deterministic ordering and each element is materialized exactly
153/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
154/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
155/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
156///
157/// Type Parameters:
158/// - `Type`: the type of elements in the stream
159/// - `Loc`: the location where the stream is being materialized
160/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
161/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
162///   (default is [`TotalOrder`])
163/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
164///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
165pub struct Stream<
166    Type,
167    Loc,
168    Bound: Boundedness,
169    Order: Ordering = TotalOrder,
170    Retry: Retries = ExactlyOnce,
171> {
172    pub(crate) location: Loc,
173    pub(crate) ir_node: RefCell<HydroNode>,
174
175    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
176}
177
178impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
179    for Stream<T, L, Unbounded, O, R>
180where
181    L: Location<'a>,
182{
183    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
184        Stream {
185            location: stream.location,
186            ir_node: stream.ir_node,
187            _phantom: PhantomData,
188        }
189    }
190}
191
192impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
193    for Stream<T, L, B, NoOrder, R>
194where
195    L: Location<'a>,
196{
197    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
198        Stream {
199            location: stream.location,
200            ir_node: stream.ir_node,
201            _phantom: PhantomData,
202        }
203    }
204}
205
206impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
207    for Stream<T, L, B, O, AtLeastOnce>
208where
209    L: Location<'a>,
210{
211    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
212        Stream {
213            location: stream.location,
214            ir_node: stream.ir_node,
215            _phantom: PhantomData,
216        }
217    }
218}
219
220impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
221where
222    L: Location<'a>,
223{
224    fn defer_tick(self) -> Self {
225        Stream::defer_tick(self)
226    }
227}
228
229impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
230    for Stream<T, Tick<L>, Bounded, O, R>
231where
232    L: Location<'a>,
233{
234    type Location = Tick<L>;
235
236    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
237        Stream::new(
238            location.clone(),
239            HydroNode::CycleSource {
240                ident,
241                metadata: location.new_node_metadata(Self::collection_kind()),
242            },
243        )
244    }
245}
246
247impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
248    for Stream<T, Tick<L>, Bounded, O, R>
249where
250    L: Location<'a>,
251{
252    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
253        assert_eq!(
254            Location::id(&self.location),
255            expected_location,
256            "locations do not match"
257        );
258        self.location
259            .flow_state()
260            .borrow_mut()
261            .push_root(HydroRoot::CycleSink {
262                ident,
263                input: Box::new(self.ir_node.into_inner()),
264                op_metadata: HydroIrOpMetadata::new(),
265            });
266    }
267}
268
269impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
270    for Stream<T, L, B, O, R>
271where
272    L: Location<'a> + NoTick,
273{
274    type Location = L;
275
276    fn create_source(ident: syn::Ident, location: L) -> Self {
277        Stream::new(
278            location.clone(),
279            HydroNode::CycleSource {
280                ident,
281                metadata: location.new_node_metadata(Self::collection_kind()),
282            },
283        )
284    }
285}
286
287impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
288    for Stream<T, L, B, O, R>
289where
290    L: Location<'a> + NoTick,
291{
292    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
293        assert_eq!(
294            Location::id(&self.location),
295            expected_location,
296            "locations do not match"
297        );
298        self.location
299            .flow_state()
300            .borrow_mut()
301            .push_root(HydroRoot::CycleSink {
302                ident,
303                input: Box::new(self.ir_node.into_inner()),
304                op_metadata: HydroIrOpMetadata::new(),
305            });
306    }
307}
308
309impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
310where
311    T: Clone,
312    L: Location<'a>,
313{
314    fn clone(&self) -> Self {
315        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
316            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
317            *self.ir_node.borrow_mut() = HydroNode::Tee {
318                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
319                metadata: self.location.new_node_metadata(Self::collection_kind()),
320            };
321        }
322
323        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
324            Stream {
325                location: self.location.clone(),
326                ir_node: HydroNode::Tee {
327                    inner: TeeNode(inner.0.clone()),
328                    metadata: metadata.clone(),
329                }
330                .into(),
331                _phantom: PhantomData,
332            }
333        } else {
334            unreachable!()
335        }
336    }
337}
338
339impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
340where
341    L: Location<'a>,
342{
343    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
344        debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
345        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
346
347        Stream {
348            location,
349            ir_node: RefCell::new(ir_node),
350            _phantom: PhantomData,
351        }
352    }
353
354    /// Returns the [`Location`] where this stream is being materialized.
355    pub fn location(&self) -> &L {
356        &self.location
357    }
358
359    pub(crate) fn collection_kind() -> CollectionKind {
360        CollectionKind::Stream {
361            bound: B::BOUND_KIND,
362            order: O::ORDERING_KIND,
363            retry: R::RETRIES_KIND,
364            element_type: stageleft::quote_type::<T>().into(),
365        }
366    }
367
368    /// Produces a stream based on invoking `f` on each element.
369    /// If you do not want to modify the stream and instead only want to view
370    /// each item use [`Stream::inspect`] instead.
371    ///
372    /// # Example
373    /// ```rust
374    /// # use hydro_lang::prelude::*;
375    /// # use futures::StreamExt;
376    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
377    /// let words = process.source_iter(q!(vec!["hello", "world"]));
378    /// words.map(q!(|x| x.to_uppercase()))
379    /// # }, |mut stream| async move {
380    /// # for w in vec!["HELLO", "WORLD"] {
381    /// #     assert_eq!(stream.next().await.unwrap(), w);
382    /// # }
383    /// # }));
384    /// ```
385    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
386    where
387        F: Fn(T) -> U + 'a,
388    {
389        let f = f.splice_fn1_ctx(&self.location).into();
390        Stream::new(
391            self.location.clone(),
392            HydroNode::Map {
393                f,
394                input: Box::new(self.ir_node.into_inner()),
395                metadata: self
396                    .location
397                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
398            },
399        )
400    }
401
402    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
403    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
404    /// for the output type `U` must produce items in a **deterministic** order.
405    ///
406    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
407    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
408    ///
409    /// # Example
410    /// ```rust
411    /// # use hydro_lang::prelude::*;
412    /// # use futures::StreamExt;
413    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
414    /// process
415    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
416    ///     .flat_map_ordered(q!(|x| x))
417    /// # }, |mut stream| async move {
418    /// // 1, 2, 3, 4
419    /// # for w in (1..5) {
420    /// #     assert_eq!(stream.next().await.unwrap(), w);
421    /// # }
422    /// # }));
423    /// ```
424    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
425    where
426        I: IntoIterator<Item = U>,
427        F: Fn(T) -> I + 'a,
428    {
429        let f = f.splice_fn1_ctx(&self.location).into();
430        Stream::new(
431            self.location.clone(),
432            HydroNode::FlatMap {
433                f,
434                input: Box::new(self.ir_node.into_inner()),
435                metadata: self
436                    .location
437                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
438            },
439        )
440    }
441
442    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
443    /// for the output type `U` to produce items in any order.
444    ///
445    /// # Example
446    /// ```rust
447    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
448    /// # use futures::StreamExt;
449    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
450    /// process
451    ///     .source_iter(q!(vec![
452    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
453    ///         std::collections::HashSet::from_iter(vec![3, 4]),
454    ///     ]))
455    ///     .flat_map_unordered(q!(|x| x))
456    /// # }, |mut stream| async move {
457    /// // 1, 2, 3, 4, but in no particular order
458    /// # let mut results = Vec::new();
459    /// # for w in (1..5) {
460    /// #     results.push(stream.next().await.unwrap());
461    /// # }
462    /// # results.sort();
463    /// # assert_eq!(results, vec![1, 2, 3, 4]);
464    /// # }));
465    /// ```
466    pub fn flat_map_unordered<U, I, F>(
467        self,
468        f: impl IntoQuotedMut<'a, F, L>,
469    ) -> Stream<U, L, B, NoOrder, R>
470    where
471        I: IntoIterator<Item = U>,
472        F: Fn(T) -> I + 'a,
473    {
474        let f = f.splice_fn1_ctx(&self.location).into();
475        Stream::new(
476            self.location.clone(),
477            HydroNode::FlatMap {
478                f,
479                input: Box::new(self.ir_node.into_inner()),
480                metadata: self
481                    .location
482                    .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
483            },
484        )
485    }
486
487    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
488    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
489    ///
490    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
491    /// not deterministic, use [`Stream::flatten_unordered`] instead.
492    ///
493    /// ```rust
494    /// # use hydro_lang::prelude::*;
495    /// # use futures::StreamExt;
496    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
497    /// process
498    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
499    ///     .flatten_ordered()
500    /// # }, |mut stream| async move {
501    /// // 1, 2, 3, 4
502    /// # for w in (1..5) {
503    /// #     assert_eq!(stream.next().await.unwrap(), w);
504    /// # }
505    /// # }));
506    /// ```
507    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
508    where
509        T: IntoIterator<Item = U>,
510    {
511        self.flat_map_ordered(q!(|d| d))
512    }
513
514    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
515    /// for the element type `T` to produce items in any order.
516    ///
517    /// # Example
518    /// ```rust
519    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
520    /// # use futures::StreamExt;
521    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
522    /// process
523    ///     .source_iter(q!(vec![
524    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
525    ///         std::collections::HashSet::from_iter(vec![3, 4]),
526    ///     ]))
527    ///     .flatten_unordered()
528    /// # }, |mut stream| async move {
529    /// // 1, 2, 3, 4, but in no particular order
530    /// # let mut results = Vec::new();
531    /// # for w in (1..5) {
532    /// #     results.push(stream.next().await.unwrap());
533    /// # }
534    /// # results.sort();
535    /// # assert_eq!(results, vec![1, 2, 3, 4]);
536    /// # }));
537    /// ```
538    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
539    where
540        T: IntoIterator<Item = U>,
541    {
542        self.flat_map_unordered(q!(|d| d))
543    }
544
545    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
546    /// `f`, preserving the order of the elements.
547    ///
548    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
549    /// not modify or take ownership of the values. If you need to modify the values while filtering
550    /// use [`Stream::filter_map`] instead.
551    ///
552    /// # Example
553    /// ```rust
554    /// # use hydro_lang::prelude::*;
555    /// # use futures::StreamExt;
556    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
557    /// process
558    ///     .source_iter(q!(vec![1, 2, 3, 4]))
559    ///     .filter(q!(|&x| x > 2))
560    /// # }, |mut stream| async move {
561    /// // 3, 4
562    /// # for w in (3..5) {
563    /// #     assert_eq!(stream.next().await.unwrap(), w);
564    /// # }
565    /// # }));
566    /// ```
567    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
568    where
569        F: Fn(&T) -> bool + 'a,
570    {
571        let f = f.splice_fn1_borrow_ctx(&self.location).into();
572        Stream::new(
573            self.location.clone(),
574            HydroNode::Filter {
575                f,
576                input: Box::new(self.ir_node.into_inner()),
577                metadata: self.location.new_node_metadata(Self::collection_kind()),
578            },
579        )
580    }
581
582    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
583    ///
584    /// # Example
585    /// ```rust
586    /// # use hydro_lang::prelude::*;
587    /// # use futures::StreamExt;
588    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
589    /// process
590    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
591    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
592    /// # }, |mut stream| async move {
593    /// // 1, 2
594    /// # for w in (1..3) {
595    /// #     assert_eq!(stream.next().await.unwrap(), w);
596    /// # }
597    /// # }));
598    /// ```
599    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
600    where
601        F: Fn(T) -> Option<U> + 'a,
602    {
603        let f = f.splice_fn1_ctx(&self.location).into();
604        Stream::new(
605            self.location.clone(),
606            HydroNode::FilterMap {
607                f,
608                input: Box::new(self.ir_node.into_inner()),
609                metadata: self
610                    .location
611                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
612            },
613        )
614    }
615
616    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
617    /// where `x` is the final value of `other`, a bounded [`Singleton`].
618    ///
619    /// # Example
620    /// ```rust
621    /// # use hydro_lang::prelude::*;
622    /// # use futures::StreamExt;
623    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
624    /// let tick = process.tick();
625    /// let batch = process
626    ///   .source_iter(q!(vec![1, 2, 3, 4]))
627    ///   .batch(&tick, nondet!(/** test */));
628    /// let count = batch.clone().count(); // `count()` returns a singleton
629    /// batch.cross_singleton(count).all_ticks()
630    /// # }, |mut stream| async move {
631    /// // (1, 4), (2, 4), (3, 4), (4, 4)
632    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
633    /// #     assert_eq!(stream.next().await.unwrap(), w);
634    /// # }
635    /// # }));
636    /// ```
637    pub fn cross_singleton<O2>(
638        self,
639        other: impl Into<Optional<O2, L, Bounded>>,
640    ) -> Stream<(T, O2), L, B, O, R>
641    where
642        O2: Clone,
643    {
644        let other: Optional<O2, L, Bounded> = other.into();
645        check_matching_location(&self.location, &other.location);
646
647        Stream::new(
648            self.location.clone(),
649            HydroNode::CrossSingleton {
650                left: Box::new(self.ir_node.into_inner()),
651                right: Box::new(other.ir_node.into_inner()),
652                metadata: self
653                    .location
654                    .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
655            },
656        )
657    }
658
659    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
660    ///
661    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
662    /// leader of a cluster.
663    ///
664    /// # Example
665    /// ```rust
666    /// # use hydro_lang::prelude::*;
667    /// # use futures::StreamExt;
668    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
669    /// let tick = process.tick();
670    /// // ticks are lazy by default, forces the second tick to run
671    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
672    ///
673    /// let batch_first_tick = process
674    ///   .source_iter(q!(vec![1, 2, 3, 4]))
675    ///   .batch(&tick, nondet!(/** test */));
676    /// let batch_second_tick = process
677    ///   .source_iter(q!(vec![5, 6, 7, 8]))
678    ///   .batch(&tick, nondet!(/** test */))
679    ///   .defer_tick(); // appears on the second tick
680    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
681    /// batch_first_tick.chain(batch_second_tick)
682    ///   .filter_if_some(some_on_first_tick)
683    ///   .all_ticks()
684    /// # }, |mut stream| async move {
685    /// // [1, 2, 3, 4]
686    /// # for w in vec![1, 2, 3, 4] {
687    /// #     assert_eq!(stream.next().await.unwrap(), w);
688    /// # }
689    /// # }));
690    /// ```
691    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
692        self.cross_singleton(signal.map(q!(|_u| ())))
693            .map(q!(|(d, _signal)| d))
694    }
695
696    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
697    ///
698    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
699    /// some local state.
700    ///
701    /// # Example
702    /// ```rust
703    /// # use hydro_lang::prelude::*;
704    /// # use futures::StreamExt;
705    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
706    /// let tick = process.tick();
707    /// // ticks are lazy by default, forces the second tick to run
708    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
709    ///
710    /// let batch_first_tick = process
711    ///   .source_iter(q!(vec![1, 2, 3, 4]))
712    ///   .batch(&tick, nondet!(/** test */));
713    /// let batch_second_tick = process
714    ///   .source_iter(q!(vec![5, 6, 7, 8]))
715    ///   .batch(&tick, nondet!(/** test */))
716    ///   .defer_tick(); // appears on the second tick
717    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
718    /// batch_first_tick.chain(batch_second_tick)
719    ///   .filter_if_none(some_on_first_tick)
720    ///   .all_ticks()
721    /// # }, |mut stream| async move {
722    /// // [5, 6, 7, 8]
723    /// # for w in vec![5, 6, 7, 8] {
724    /// #     assert_eq!(stream.next().await.unwrap(), w);
725    /// # }
726    /// # }));
727    /// ```
728    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
729        self.filter_if_some(
730            other
731                .map(q!(|_| ()))
732                .into_singleton()
733                .filter(q!(|o| o.is_none())),
734        )
735    }
736
737    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
738    /// tupled pairs in a non-deterministic order.
739    ///
740    /// # Example
741    /// ```rust
742    /// # use hydro_lang::prelude::*;
743    /// # use std::collections::HashSet;
744    /// # use futures::StreamExt;
745    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
746    /// let tick = process.tick();
747    /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
748    /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
749    /// stream1.cross_product(stream2)
750    /// # }, |mut stream| async move {
751    /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
752    /// # stream.map(|i| assert!(expected.contains(&i)));
753    /// # }));
754    /// ```
755    pub fn cross_product<T2, O2: Ordering>(
756        self,
757        other: Stream<T2, L, B, O2, R>,
758    ) -> Stream<(T, T2), L, B, NoOrder, R>
759    where
760        T: Clone,
761        T2: Clone,
762    {
763        check_matching_location(&self.location, &other.location);
764
765        Stream::new(
766            self.location.clone(),
767            HydroNode::CrossProduct {
768                left: Box::new(self.ir_node.into_inner()),
769                right: Box::new(other.ir_node.into_inner()),
770                metadata: self
771                    .location
772                    .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
773            },
774        )
775    }
776
777    /// Takes one stream as input and filters out any duplicate occurrences. The output
778    /// contains all unique values from the input.
779    ///
780    /// # Example
781    /// ```rust
782    /// # use hydro_lang::prelude::*;
783    /// # use futures::StreamExt;
784    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
785    /// let tick = process.tick();
786    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
787    /// # }, |mut stream| async move {
788    /// # for w in vec![1, 2, 3, 4] {
789    /// #     assert_eq!(stream.next().await.unwrap(), w);
790    /// # }
791    /// # }));
792    /// ```
793    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
794    where
795        T: Eq + Hash,
796    {
797        Stream::new(
798            self.location.clone(),
799            HydroNode::Unique {
800                input: Box::new(self.ir_node.into_inner()),
801                metadata: self
802                    .location
803                    .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
804            },
805        )
806    }
807
808    /// Outputs everything in this stream that is *not* contained in the `other` stream.
809    ///
810    /// The `other` stream must be [`Bounded`], since this function will wait until
811    /// all its elements are available before producing any output.
812    /// # Example
813    /// ```rust
814    /// # use hydro_lang::prelude::*;
815    /// # use futures::StreamExt;
816    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
817    /// let tick = process.tick();
818    /// let stream = process
819    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
820    ///   .batch(&tick, nondet!(/** test */));
821    /// let batch = process
822    ///   .source_iter(q!(vec![1, 2]))
823    ///   .batch(&tick, nondet!(/** test */));
824    /// stream.filter_not_in(batch).all_ticks()
825    /// # }, |mut stream| async move {
826    /// # for w in vec![3, 4] {
827    /// #     assert_eq!(stream.next().await.unwrap(), w);
828    /// # }
829    /// # }));
830    /// ```
831    pub fn filter_not_in<O2: Ordering>(self, other: Stream<T, L, Bounded, O2, R>) -> Self
832    where
833        T: Eq + Hash,
834    {
835        check_matching_location(&self.location, &other.location);
836
837        Stream::new(
838            self.location.clone(),
839            HydroNode::Difference {
840                pos: Box::new(self.ir_node.into_inner()),
841                neg: Box::new(other.ir_node.into_inner()),
842                metadata: self
843                    .location
844                    .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
845            },
846        )
847    }
848
849    /// An operator which allows you to "inspect" each element of a stream without
850    /// modifying it. The closure `f` is called on a reference to each item. This is
851    /// mainly useful for debugging, and should not be used to generate side-effects.
852    ///
853    /// # Example
854    /// ```rust
855    /// # use hydro_lang::prelude::*;
856    /// # use futures::StreamExt;
857    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
858    /// let nums = process.source_iter(q!(vec![1, 2]));
859    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
860    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
861    /// # }, |mut stream| async move {
862    /// # for w in vec![1, 2] {
863    /// #     assert_eq!(stream.next().await.unwrap(), w);
864    /// # }
865    /// # }));
866    /// ```
867    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
868    where
869        F: Fn(&T) + 'a,
870    {
871        let f = f.splice_fn1_borrow_ctx(&self.location).into();
872
873        Stream::new(
874            self.location.clone(),
875            HydroNode::Inspect {
876                f,
877                input: Box::new(self.ir_node.into_inner()),
878                metadata: self.location.new_node_metadata(Self::collection_kind()),
879            },
880        )
881    }
882
883    /// An operator which allows you to "name" a `HydroNode`.
884    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
885    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
886        {
887            let mut node = self.ir_node.borrow_mut();
888            let metadata = node.metadata_mut();
889            metadata.tag = Some(name.to_string());
890        }
891        self
892    }
893
894    /// Explicitly "casts" the stream to a type with a different ordering
895    /// guarantee. Useful in unsafe code where the ordering cannot be proven
896    /// by the type-system.
897    ///
898    /// # Non-Determinism
899    /// This function is used as an escape hatch, and any mistakes in the
900    /// provided ordering guarantee will propagate into the guarantees
901    /// for the rest of the program.
902    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
903        if O::ORDERING_KIND == O2::ORDERING_KIND {
904            Stream::new(self.location, self.ir_node.into_inner())
905        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
906            // We can always weaken the ordering guarantee
907            Stream::new(
908                self.location.clone(),
909                HydroNode::Cast {
910                    inner: Box::new(self.ir_node.into_inner()),
911                    metadata: self
912                        .location
913                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
914                },
915            )
916        } else {
917            Stream::new(
918                self.location.clone(),
919                HydroNode::ObserveNonDet {
920                    inner: Box::new(self.ir_node.into_inner()),
921                    metadata: self
922                        .location
923                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
924                },
925            )
926        }
927    }
928
929    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
930    /// which is always safe because that is the weakest possible guarantee.
931    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
932        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
933        self.assume_ordering::<NoOrder>(nondet)
934    }
935
936    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
937    /// enforcing that `O2` is weaker than the input ordering guarantee.
938    pub fn weaken_ordering<O2: Ordering + MinOrder<O, Min = O2>>(self) -> Stream<T, L, B, O2, R> {
939        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
940        self.assume_ordering::<O2>(nondet)
941    }
942
943    /// Explicitly "casts" the stream to a type with a different retries
944    /// guarantee. Useful in unsafe code where the lack of retries cannot
945    /// be proven by the type-system.
946    ///
947    /// # Non-Determinism
948    /// This function is used as an escape hatch, and any mistakes in the
949    /// provided retries guarantee will propagate into the guarantees
950    /// for the rest of the program.
951    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
952        if R::RETRIES_KIND == R2::RETRIES_KIND {
953            Stream::new(self.location, self.ir_node.into_inner())
954        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
955            // We can always weaken the retries guarantee
956            Stream::new(
957                self.location.clone(),
958                HydroNode::Cast {
959                    inner: Box::new(self.ir_node.into_inner()),
960                    metadata: self
961                        .location
962                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
963                },
964            )
965        } else {
966            Stream::new(
967                self.location.clone(),
968                HydroNode::ObserveNonDet {
969                    inner: Box::new(self.ir_node.into_inner()),
970                    metadata: self
971                        .location
972                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
973                },
974            )
975        }
976    }
977
978    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
979    /// which is always safe because that is the weakest possible guarantee.
980    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
981        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
982        self.assume_retries::<AtLeastOnce>(nondet)
983    }
984
985    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
986    /// enforcing that `R2` is weaker than the input retries guarantee.
987    pub fn weaken_retries<R2: Retries + MinRetries<R, Min = R2>>(self) -> Stream<T, L, B, O, R2> {
988        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
989        self.assume_retries::<R2>(nondet)
990    }
991}
992
993impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
994where
995    L: Location<'a>,
996{
997    /// Given a stream with [`ExactlyOnce`] retry guarantees, weakens it to an arbitrary guarantee
998    /// `R2`, which is safe because all guarantees are equal to or weaker than [`ExactlyOnce`]
999    pub fn weaker_retries<R2: Retries>(self) -> Stream<T, L, B, O, R2> {
1000        self.assume_retries(
1001            nondet!(/** any retry ordering is the same or weaker than ExactlyOnce */),
1002        )
1003    }
1004}
1005
1006impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
1007where
1008    L: Location<'a>,
1009{
1010    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
1011    ///
1012    /// # Example
1013    /// ```rust
1014    /// # use hydro_lang::prelude::*;
1015    /// # use futures::StreamExt;
1016    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1017    /// process.source_iter(q!(&[1, 2, 3])).cloned()
1018    /// # }, |mut stream| async move {
1019    /// // 1, 2, 3
1020    /// # for w in vec![1, 2, 3] {
1021    /// #     assert_eq!(stream.next().await.unwrap(), w);
1022    /// # }
1023    /// # }));
1024    /// ```
1025    pub fn cloned(self) -> Stream<T, L, B, O, R>
1026    where
1027        T: Clone,
1028    {
1029        self.map(q!(|d| d.clone()))
1030    }
1031}
1032
1033impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
1034where
1035    L: Location<'a>,
1036{
1037    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1038    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1039    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1040    ///
1041    /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
1042    /// and there may be duplicates.
1043    ///
1044    /// # Example
1045    /// ```rust
1046    /// # use hydro_lang::prelude::*;
1047    /// # use futures::StreamExt;
1048    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1049    /// let tick = process.tick();
1050    /// let bools = process.source_iter(q!(vec![false, true, false]));
1051    /// let batch = bools.batch(&tick, nondet!(/** test */));
1052    /// batch
1053    ///     .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1054    ///     .all_ticks()
1055    /// # }, |mut stream| async move {
1056    /// // true
1057    /// # assert_eq!(stream.next().await.unwrap(), true);
1058    /// # }));
1059    /// ```
1060    pub fn fold_commutative_idempotent<A, I, F>(
1061        self,
1062        init: impl IntoQuotedMut<'a, I, L>,
1063        comb: impl IntoQuotedMut<'a, F, L>,
1064    ) -> Singleton<A, L, B>
1065    where
1066        I: Fn() -> A + 'a,
1067        F: Fn(&mut A, T),
1068    {
1069        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1070        self.assume_ordering(nondet)
1071            .assume_retries(nondet)
1072            .fold(init, comb)
1073    }
1074
1075    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1076    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1077    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1078    /// reference, so that it can be modified in place.
1079    ///
1080    /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
1081    /// and there may be duplicates.
1082    ///
1083    /// # Example
1084    /// ```rust
1085    /// # use hydro_lang::prelude::*;
1086    /// # use futures::StreamExt;
1087    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1088    /// let tick = process.tick();
1089    /// let bools = process.source_iter(q!(vec![false, true, false]));
1090    /// let batch = bools.batch(&tick, nondet!(/** test */));
1091    /// batch
1092    ///     .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1093    ///     .all_ticks()
1094    /// # }, |mut stream| async move {
1095    /// // true
1096    /// # assert_eq!(stream.next().await.unwrap(), true);
1097    /// # }));
1098    /// ```
1099    pub fn reduce_commutative_idempotent<F>(
1100        self,
1101        comb: impl IntoQuotedMut<'a, F, L>,
1102    ) -> Optional<T, L, B>
1103    where
1104        F: Fn(&mut T, T) + 'a,
1105    {
1106        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1107        self.assume_ordering(nondet)
1108            .assume_retries(nondet)
1109            .reduce(comb)
1110    }
1111
1112    /// Computes the maximum element in the stream as an [`Optional`], which
1113    /// will be empty until the first element in the input arrives.
1114    ///
1115    /// # Example
1116    /// ```rust
1117    /// # use hydro_lang::prelude::*;
1118    /// # use futures::StreamExt;
1119    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1120    /// let tick = process.tick();
1121    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1122    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1123    /// batch.max().all_ticks()
1124    /// # }, |mut stream| async move {
1125    /// // 4
1126    /// # assert_eq!(stream.next().await.unwrap(), 4);
1127    /// # }));
1128    /// ```
1129    pub fn max(self) -> Optional<T, L, B>
1130    where
1131        T: Ord,
1132    {
1133        self.reduce_commutative_idempotent(q!(|curr, new| {
1134            if new > *curr {
1135                *curr = new;
1136            }
1137        }))
1138    }
1139
1140    /// Computes the minimum element in the stream as an [`Optional`], which
1141    /// will be empty until the first element in the input arrives.
1142    ///
1143    /// # Example
1144    /// ```rust
1145    /// # use hydro_lang::prelude::*;
1146    /// # use futures::StreamExt;
1147    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1148    /// let tick = process.tick();
1149    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1150    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1151    /// batch.min().all_ticks()
1152    /// # }, |mut stream| async move {
1153    /// // 1
1154    /// # assert_eq!(stream.next().await.unwrap(), 1);
1155    /// # }));
1156    /// ```
1157    pub fn min(self) -> Optional<T, L, B>
1158    where
1159        T: Ord,
1160    {
1161        self.reduce_commutative_idempotent(q!(|curr, new| {
1162            if new < *curr {
1163                *curr = new;
1164            }
1165        }))
1166    }
1167}
1168
1169impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1170where
1171    L: Location<'a>,
1172{
1173    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1174    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1175    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1176    ///
1177    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1178    ///
1179    /// # Example
1180    /// ```rust
1181    /// # use hydro_lang::prelude::*;
1182    /// # use futures::StreamExt;
1183    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1184    /// let tick = process.tick();
1185    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1186    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1187    /// batch
1188    ///     .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1189    ///     .all_ticks()
1190    /// # }, |mut stream| async move {
1191    /// // 10
1192    /// # assert_eq!(stream.next().await.unwrap(), 10);
1193    /// # }));
1194    /// ```
1195    pub fn fold_commutative<A, I, F>(
1196        self,
1197        init: impl IntoQuotedMut<'a, I, L>,
1198        comb: impl IntoQuotedMut<'a, F, L>,
1199    ) -> Singleton<A, L, B>
1200    where
1201        I: Fn() -> A + 'a,
1202        F: Fn(&mut A, T),
1203    {
1204        let nondet = nondet!(/** the combinator function is commutative */);
1205        self.assume_ordering(nondet).fold(init, comb)
1206    }
1207
1208    /// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream,
1209    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1210    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1211    /// reference, so that it can be modified in place.
1212    ///
1213    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1214    ///
1215    /// # Example
1216    /// ```rust
1217    /// # use hydro_lang::prelude::*;
1218    /// # use futures::StreamExt;
1219    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1220    /// let tick = process.tick();
1221    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1222    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1223    /// batch
1224    ///     .reduce_commutative(q!(|curr, new| *curr += new))
1225    ///     .all_ticks()
1226    /// # }, |mut stream| async move {
1227    /// // 10
1228    /// # assert_eq!(stream.next().await.unwrap(), 10);
1229    /// # }));
1230    /// ```
1231    pub fn reduce_commutative<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1232    where
1233        F: Fn(&mut T, T) + 'a,
1234    {
1235        let nondet = nondet!(/** the combinator function is commutative */);
1236        self.assume_ordering(nondet).reduce(comb)
1237    }
1238
1239    /// Computes the number of elements in the stream as a [`Singleton`].
1240    ///
1241    /// # Example
1242    /// ```rust
1243    /// # use hydro_lang::prelude::*;
1244    /// # use futures::StreamExt;
1245    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1246    /// let tick = process.tick();
1247    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1248    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1249    /// batch.count().all_ticks()
1250    /// # }, |mut stream| async move {
1251    /// // 4
1252    /// # assert_eq!(stream.next().await.unwrap(), 4);
1253    /// # }));
1254    /// ```
1255    pub fn count(self) -> Singleton<usize, L, B> {
1256        self.fold_commutative(q!(|| 0usize), q!(|count, _| *count += 1))
1257    }
1258}
1259
1260impl<'a, T, L, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R>
1261where
1262    L: Location<'a>,
1263{
1264    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1265    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1266    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1267    ///
1268    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1269    ///
1270    /// # Example
1271    /// ```rust
1272    /// # use hydro_lang::prelude::*;
1273    /// # use futures::StreamExt;
1274    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1275    /// let tick = process.tick();
1276    /// let bools = process.source_iter(q!(vec![false, true, false]));
1277    /// let batch = bools.batch(&tick, nondet!(/** test */));
1278    /// batch
1279    ///     .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1280    ///     .all_ticks()
1281    /// # }, |mut stream| async move {
1282    /// // true
1283    /// # assert_eq!(stream.next().await.unwrap(), true);
1284    /// # }));
1285    /// ```
1286    pub fn fold_idempotent<A, I, F>(
1287        self,
1288        init: impl IntoQuotedMut<'a, I, L>,
1289        comb: impl IntoQuotedMut<'a, F, L>,
1290    ) -> Singleton<A, L, B>
1291    where
1292        I: Fn() -> A + 'a,
1293        F: Fn(&mut A, T),
1294    {
1295        let nondet = nondet!(/** the combinator function is idempotent */);
1296        self.assume_retries(nondet).fold(init, comb)
1297    }
1298
1299    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1300    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1301    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1302    /// reference, so that it can be modified in place.
1303    ///
1304    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1305    ///
1306    /// # Example
1307    /// ```rust
1308    /// # use hydro_lang::prelude::*;
1309    /// # use futures::StreamExt;
1310    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1311    /// let tick = process.tick();
1312    /// let bools = process.source_iter(q!(vec![false, true, false]));
1313    /// let batch = bools.batch(&tick, nondet!(/** test */));
1314    /// batch.reduce_idempotent(q!(|acc, x| *acc |= x)).all_ticks()
1315    /// # }, |mut stream| async move {
1316    /// // true
1317    /// # assert_eq!(stream.next().await.unwrap(), true);
1318    /// # }));
1319    /// ```
1320    pub fn reduce_idempotent<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1321    where
1322        F: Fn(&mut T, T) + 'a,
1323    {
1324        let nondet = nondet!(/** the combinator function is idempotent */);
1325        self.assume_retries(nondet).reduce(comb)
1326    }
1327
1328    /// Computes the first element in the stream as an [`Optional`], which
1329    /// will be empty until the first element in the input arrives.
1330    ///
1331    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1332    /// re-ordering of elements may cause the first element to change.
1333    ///
1334    /// # Example
1335    /// ```rust
1336    /// # use hydro_lang::prelude::*;
1337    /// # use futures::StreamExt;
1338    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1339    /// let tick = process.tick();
1340    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1341    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1342    /// batch.first().all_ticks()
1343    /// # }, |mut stream| async move {
1344    /// // 1
1345    /// # assert_eq!(stream.next().await.unwrap(), 1);
1346    /// # }));
1347    /// ```
1348    pub fn first(self) -> Optional<T, L, B> {
1349        self.reduce_idempotent(q!(|_, _| {}))
1350    }
1351
1352    /// Computes the last element in the stream as an [`Optional`], which
1353    /// will be empty until an element in the input arrives.
1354    ///
1355    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1356    /// re-ordering of elements may cause the last element to change.
1357    ///
1358    /// # Example
1359    /// ```rust
1360    /// # use hydro_lang::prelude::*;
1361    /// # use futures::StreamExt;
1362    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1363    /// let tick = process.tick();
1364    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1365    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1366    /// batch.last().all_ticks()
1367    /// # }, |mut stream| async move {
1368    /// // 4
1369    /// # assert_eq!(stream.next().await.unwrap(), 4);
1370    /// # }));
1371    /// ```
1372    pub fn last(self) -> Optional<T, L, B> {
1373        self.reduce_idempotent(q!(|curr, new| *curr = new))
1374    }
1375}
1376
1377impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
1378where
1379    L: Location<'a>,
1380{
1381    /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1382    ///
1383    /// # Example
1384    /// ```rust
1385    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1386    /// # use futures::StreamExt;
1387    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, TotalOrder, ExactlyOnce>(|process| {
1388    /// let tick = process.tick();
1389    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1390    /// numbers.enumerate()
1391    /// # }, |mut stream| async move {
1392    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1393    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1394    /// #     assert_eq!(stream.next().await.unwrap(), w);
1395    /// # }
1396    /// # }));
1397    /// ```
1398    pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder, ExactlyOnce> {
1399        Stream::new(
1400            self.location.clone(),
1401            HydroNode::Enumerate {
1402                input: Box::new(self.ir_node.into_inner()),
1403                metadata: self.location.new_node_metadata(Stream::<
1404                    (usize, T),
1405                    L,
1406                    B,
1407                    TotalOrder,
1408                    ExactlyOnce,
1409                >::collection_kind()),
1410            },
1411        )
1412    }
1413
1414    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1415    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1416    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1417    ///
1418    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1419    /// to depend on the order of elements in the stream.
1420    ///
1421    /// # Example
1422    /// ```rust
1423    /// # use hydro_lang::prelude::*;
1424    /// # use futures::StreamExt;
1425    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1426    /// let tick = process.tick();
1427    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1428    /// let batch = words.batch(&tick, nondet!(/** test */));
1429    /// batch
1430    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1431    ///     .all_ticks()
1432    /// # }, |mut stream| async move {
1433    /// // "HELLOWORLD"
1434    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1435    /// # }));
1436    /// ```
1437    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
1438        self,
1439        init: impl IntoQuotedMut<'a, I, L>,
1440        comb: impl IntoQuotedMut<'a, F, L>,
1441    ) -> Singleton<A, L, B> {
1442        let init = init.splice_fn0_ctx(&self.location).into();
1443        let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1444
1445        let core = HydroNode::Fold {
1446            init,
1447            acc: comb,
1448            input: Box::new(self.ir_node.into_inner()),
1449            metadata: self
1450                .location
1451                .new_node_metadata(Singleton::<A, L, B>::collection_kind()),
1452        };
1453
1454        Singleton::new(self.location, core)
1455    }
1456
1457    /// Collects all the elements of this stream into a single [`Vec`] element.
1458    ///
1459    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1460    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1461    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1462    /// the vector at an arbitrary point in time.
1463    ///
1464    /// # Example
1465    /// ```rust
1466    /// # use hydro_lang::prelude::*;
1467    /// # use futures::StreamExt;
1468    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1469    /// let tick = process.tick();
1470    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1471    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1472    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1473    /// # }, |mut stream| async move {
1474    /// // [ vec![1, 2, 3, 4] ]
1475    /// # for w in vec![vec![1, 2, 3, 4]] {
1476    /// #     assert_eq!(stream.next().await.unwrap(), w);
1477    /// # }
1478    /// # }));
1479    /// ```
1480    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B> {
1481        self.fold(
1482            q!(|| vec![]),
1483            q!(|acc, v| {
1484                acc.push(v);
1485            }),
1486        )
1487    }
1488
1489    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1490    /// and emitting each intermediate result.
1491    ///
1492    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1493    /// containing all intermediate accumulated values. The scan operation can also terminate early
1494    /// by returning `None`.
1495    ///
1496    /// The function takes a mutable reference to the accumulator and the current element, and returns
1497    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1498    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1499    ///
1500    /// # Examples
1501    ///
1502    /// Basic usage - running sum:
1503    /// ```rust
1504    /// # use hydro_lang::prelude::*;
1505    /// # use futures::StreamExt;
1506    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1507    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1508    ///     q!(|| 0),
1509    ///     q!(|acc, x| {
1510    ///         *acc += x;
1511    ///         Some(*acc)
1512    ///     }),
1513    /// )
1514    /// # }, |mut stream| async move {
1515    /// // Output: 1, 3, 6, 10
1516    /// # for w in vec![1, 3, 6, 10] {
1517    /// #     assert_eq!(stream.next().await.unwrap(), w);
1518    /// # }
1519    /// # }));
1520    /// ```
1521    ///
1522    /// Early termination example:
1523    /// ```rust
1524    /// # use hydro_lang::prelude::*;
1525    /// # use futures::StreamExt;
1526    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1527    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1528    ///     q!(|| 1),
1529    ///     q!(|state, x| {
1530    ///         *state = *state * x;
1531    ///         if *state > 6 {
1532    ///             None // Terminate the stream
1533    ///         } else {
1534    ///             Some(-*state)
1535    ///         }
1536    ///     }),
1537    /// )
1538    /// # }, |mut stream| async move {
1539    /// // Output: -1, -2, -6
1540    /// # for w in vec![-1, -2, -6] {
1541    /// #     assert_eq!(stream.next().await.unwrap(), w);
1542    /// # }
1543    /// # }));
1544    /// ```
1545    pub fn scan<A, U, I, F>(
1546        self,
1547        init: impl IntoQuotedMut<'a, I, L>,
1548        f: impl IntoQuotedMut<'a, F, L>,
1549    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1550    where
1551        I: Fn() -> A + 'a,
1552        F: Fn(&mut A, T) -> Option<U> + 'a,
1553    {
1554        let init = init.splice_fn0_ctx(&self.location).into();
1555        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1556
1557        Stream::new(
1558            self.location.clone(),
1559            HydroNode::Scan {
1560                init,
1561                acc: f,
1562                input: Box::new(self.ir_node.into_inner()),
1563                metadata: self.location.new_node_metadata(
1564                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1565                ),
1566            },
1567        )
1568    }
1569
1570    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1571    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1572    /// until the first element in the input arrives.
1573    ///
1574    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1575    /// to depend on the order of elements in the stream.
1576    ///
1577    /// # Example
1578    /// ```rust
1579    /// # use hydro_lang::prelude::*;
1580    /// # use futures::StreamExt;
1581    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1582    /// let tick = process.tick();
1583    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1584    /// let batch = words.batch(&tick, nondet!(/** test */));
1585    /// batch
1586    ///     .map(q!(|x| x.to_string()))
1587    ///     .reduce(q!(|curr, new| curr.push_str(&new)))
1588    ///     .all_ticks()
1589    /// # }, |mut stream| async move {
1590    /// // "HELLOWORLD"
1591    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1592    /// # }));
1593    /// ```
1594    pub fn reduce<F: Fn(&mut T, T) + 'a>(
1595        self,
1596        comb: impl IntoQuotedMut<'a, F, L>,
1597    ) -> Optional<T, L, B> {
1598        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1599        let core = HydroNode::Reduce {
1600            f,
1601            input: Box::new(self.ir_node.into_inner()),
1602            metadata: self
1603                .location
1604                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1605        };
1606
1607        Optional::new(self.location, core)
1608    }
1609}
1610
1611impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
1612    /// Produces a new stream that interleaves the elements of the two input streams.
1613    /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1614    ///
1615    /// Currently, both input streams must be [`Unbounded`]. When the streams are
1616    /// [`Bounded`], you can use [`Stream::chain`] instead.
1617    ///
1618    /// # Example
1619    /// ```rust
1620    /// # use hydro_lang::prelude::*;
1621    /// # use futures::StreamExt;
1622    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1623    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1624    /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1625    /// # }, |mut stream| async move {
1626    /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1627    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1628    /// #     assert_eq!(stream.next().await.unwrap(), w);
1629    /// # }
1630    /// # }));
1631    /// ```
1632    pub fn interleave<O2: Ordering, R2: Retries>(
1633        self,
1634        other: Stream<T, L, Unbounded, O2, R2>,
1635    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1636    where
1637        R: MinRetries<R2>,
1638    {
1639        let tick = self.location.tick();
1640        // Because the outputs are unordered, we can interleave batches from both streams.
1641        let nondet_batch_interleaving = nondet!(/** output stream is NoOrder, can interleave */);
1642        self.batch(&tick, nondet_batch_interleaving)
1643            .weakest_ordering()
1644            .chain(
1645                other
1646                    .batch(&tick, nondet_batch_interleaving)
1647                    .weakest_ordering(),
1648            )
1649            .all_ticks()
1650    }
1651}
1652
1653impl<'a, T, L, O: Ordering, R: Retries> Stream<T, L, Bounded, O, R>
1654where
1655    L: Location<'a>,
1656{
1657    /// Produces a new stream that emits the input elements in sorted order.
1658    ///
1659    /// The input stream can have any ordering guarantee, but the output stream
1660    /// will have a [`TotalOrder`] guarantee. This operator will block until all
1661    /// elements in the input stream are available, so it requires the input stream
1662    /// to be [`Bounded`].
1663    ///
1664    /// # Example
1665    /// ```rust
1666    /// # use hydro_lang::prelude::*;
1667    /// # use futures::StreamExt;
1668    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1669    /// let tick = process.tick();
1670    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1671    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1672    /// batch.sort().all_ticks()
1673    /// # }, |mut stream| async move {
1674    /// // 1, 2, 3, 4
1675    /// # for w in (1..5) {
1676    /// #     assert_eq!(stream.next().await.unwrap(), w);
1677    /// # }
1678    /// # }));
1679    /// ```
1680    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1681    where
1682        T: Ord,
1683    {
1684        Stream::new(
1685            self.location.clone(),
1686            HydroNode::Sort {
1687                input: Box::new(self.ir_node.into_inner()),
1688                metadata: self
1689                    .location
1690                    .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
1691            },
1692        )
1693    }
1694
1695    /// Produces a new stream that first emits the elements of the `self` stream,
1696    /// and then emits the elements of the `other` stream. The output stream has
1697    /// a [`TotalOrder`] guarantee if and only if both input streams have a
1698    /// [`TotalOrder`] guarantee.
1699    ///
1700    /// Currently, both input streams must be [`Bounded`]. This operator will block
1701    /// on the first stream until all its elements are available. In a future version,
1702    /// we will relax the requirement on the `other` stream.
1703    ///
1704    /// # Example
1705    /// ```rust
1706    /// # use hydro_lang::prelude::*;
1707    /// # use futures::StreamExt;
1708    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1709    /// let tick = process.tick();
1710    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1711    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1712    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1713    /// # }, |mut stream| async move {
1714    /// // 2, 3, 4, 5, 1, 2, 3, 4
1715    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1716    /// #     assert_eq!(stream.next().await.unwrap(), w);
1717    /// # }
1718    /// # }));
1719    /// ```
1720    pub fn chain<O2: Ordering, R2: Retries>(
1721        self,
1722        other: Stream<T, L, Bounded, O2, R2>,
1723    ) -> Stream<T, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1724    where
1725        O: MinOrder<O2>,
1726        R: MinRetries<R2>,
1727    {
1728        check_matching_location(&self.location, &other.location);
1729
1730        Stream::new(
1731            self.location.clone(),
1732            HydroNode::Chain {
1733                first: Box::new(self.ir_node.into_inner()),
1734                second: Box::new(other.ir_node.into_inner()),
1735                metadata: self.location.new_node_metadata(Stream::<
1736                    T,
1737                    L,
1738                    Bounded,
1739                    <O as MinOrder<O2>>::Min,
1740                    <R as MinRetries<R2>>::Min,
1741                >::collection_kind()),
1742            },
1743        )
1744    }
1745
1746    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
1747    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
1748    /// because this is compiled into a nested loop.
1749    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
1750        self,
1751        other: Stream<T2, L, Bounded, O2, R>,
1752    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
1753    where
1754        T: Clone,
1755        T2: Clone,
1756    {
1757        check_matching_location(&self.location, &other.location);
1758
1759        Stream::new(
1760            self.location.clone(),
1761            HydroNode::CrossProduct {
1762                left: Box::new(self.ir_node.into_inner()),
1763                right: Box::new(other.ir_node.into_inner()),
1764                metadata: self.location.new_node_metadata(Stream::<
1765                    (T, T2),
1766                    L,
1767                    Bounded,
1768                    <O2 as MinOrder<O>>::Min,
1769                    R,
1770                >::collection_kind()),
1771            },
1772        )
1773    }
1774
1775    /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
1776    /// `self` used as the values for *each* key.
1777    ///
1778    /// This is helpful when "broadcasting" a set of values so that all the keys have the same
1779    /// values. For example, it can be used to send the same set of elements to several cluster
1780    /// members, if the membership information is available as a [`KeyedSingleton`].
1781    ///
1782    /// # Example
1783    /// ```rust
1784    /// # use hydro_lang::prelude::*;
1785    /// # use futures::StreamExt;
1786    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1787    /// # let tick = process.tick();
1788    /// let keyed_singleton = // { 1: (), 2: () }
1789    /// # process
1790    /// #     .source_iter(q!(vec![(1, ()), (2, ())]))
1791    /// #     .into_keyed()
1792    /// #     .batch(&tick, nondet!(/** test */))
1793    /// #     .first();
1794    /// let stream = // [ "a", "b" ]
1795    /// # process
1796    /// #     .source_iter(q!(vec!["a".to_string(), "b".to_string()]))
1797    /// #     .batch(&tick, nondet!(/** test */));
1798    /// stream.repeat_with_keys(keyed_singleton)
1799    /// # .entries().all_ticks()
1800    /// # }, |mut stream| async move {
1801    /// // { 1: ["a", "b" ], 2: ["a", "b"] }
1802    /// # let mut results = Vec::new();
1803    /// # for _ in 0..4 {
1804    /// #     results.push(stream.next().await.unwrap());
1805    /// # }
1806    /// # results.sort();
1807    /// # assert_eq!(results, vec![(1, "a".to_string()), (1, "b".to_string()), (2, "a".to_string()), (2, "b".to_string())]);
1808    /// # }));
1809    /// ```
1810    pub fn repeat_with_keys<K, V2>(
1811        self,
1812        keys: KeyedSingleton<K, V2, L, Bounded>,
1813    ) -> KeyedStream<K, T, L, Bounded, O, R>
1814    where
1815        K: Clone,
1816        T: Clone,
1817    {
1818        keys.keys()
1819            .weaken_retries()
1820            .assume_ordering::<TotalOrder>(
1821                nondet!(/** keyed stream does not depend on ordering of keys */),
1822            )
1823            .cross_product_nested_loop(self)
1824            .into_keyed()
1825    }
1826}
1827
1828impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
1829where
1830    L: Location<'a>,
1831{
1832    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
1833    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
1834    /// by equi-joining the two streams on the key attribute `K`.
1835    ///
1836    /// # Example
1837    /// ```rust
1838    /// # use hydro_lang::prelude::*;
1839    /// # use std::collections::HashSet;
1840    /// # use futures::StreamExt;
1841    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1842    /// let tick = process.tick();
1843    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
1844    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
1845    /// stream1.join(stream2)
1846    /// # }, |mut stream| async move {
1847    /// // (1, ('a', 'x')), (2, ('b', 'y'))
1848    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
1849    /// # stream.map(|i| assert!(expected.contains(&i)));
1850    /// # }));
1851    pub fn join<V2, O2: Ordering, R2: Retries>(
1852        self,
1853        n: Stream<(K, V2), L, B, O2, R2>,
1854    ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
1855    where
1856        K: Eq + Hash,
1857        R: MinRetries<R2>,
1858    {
1859        check_matching_location(&self.location, &n.location);
1860
1861        Stream::new(
1862            self.location.clone(),
1863            HydroNode::Join {
1864                left: Box::new(self.ir_node.into_inner()),
1865                right: Box::new(n.ir_node.into_inner()),
1866                metadata: self.location.new_node_metadata(Stream::<
1867                    (K, (V1, V2)),
1868                    L,
1869                    B,
1870                    NoOrder,
1871                    <R as MinRetries<R2>>::Min,
1872                >::collection_kind()),
1873            },
1874        )
1875    }
1876
1877    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
1878    /// computes the anti-join of the items in the input -- i.e. returns
1879    /// unique items in the first input that do not have a matching key
1880    /// in the second input.
1881    ///
1882    /// # Example
1883    /// ```rust
1884    /// # use hydro_lang::prelude::*;
1885    /// # use futures::StreamExt;
1886    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1887    /// let tick = process.tick();
1888    /// let stream = process
1889    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1890    ///   .batch(&tick, nondet!(/** test */));
1891    /// let batch = process
1892    ///   .source_iter(q!(vec![1, 2]))
1893    ///   .batch(&tick, nondet!(/** test */));
1894    /// stream.anti_join(batch).all_ticks()
1895    /// # }, |mut stream| async move {
1896    /// # for w in vec![(3, 'c'), (4, 'd')] {
1897    /// #     assert_eq!(stream.next().await.unwrap(), w);
1898    /// # }
1899    /// # }));
1900    pub fn anti_join<O2: Ordering, R2: Retries>(
1901        self,
1902        n: Stream<K, L, Bounded, O2, R2>,
1903    ) -> Stream<(K, V1), L, B, O, R>
1904    where
1905        K: Eq + Hash,
1906    {
1907        check_matching_location(&self.location, &n.location);
1908
1909        Stream::new(
1910            self.location.clone(),
1911            HydroNode::AntiJoin {
1912                pos: Box::new(self.ir_node.into_inner()),
1913                neg: Box::new(n.ir_node.into_inner()),
1914                metadata: self
1915                    .location
1916                    .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
1917            },
1918        )
1919    }
1920}
1921
1922impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
1923    Stream<(K, V), L, B, O, R>
1924{
1925    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
1926    /// is used as the key and the second element is added to the entries associated with that key.
1927    ///
1928    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
1929    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
1930    /// performing grouped aggregations, but also for more precise ordering guarantees such as
1931    /// total ordering _within_ each group but no ordering _across_ groups.
1932    ///
1933    /// # Example
1934    /// ```rust
1935    /// # use hydro_lang::prelude::*;
1936    /// # use futures::StreamExt;
1937    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1938    /// process
1939    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1940    ///     .into_keyed()
1941    /// #   .entries()
1942    /// # }, |mut stream| async move {
1943    /// // { 1: [2, 3], 2: [4] }
1944    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
1945    /// #     assert_eq!(stream.next().await.unwrap(), w);
1946    /// # }
1947    /// # }));
1948    /// ```
1949    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
1950        KeyedStream::new(
1951            self.location.clone(),
1952            HydroNode::Cast {
1953                inner: Box::new(self.ir_node.into_inner()),
1954                metadata: self
1955                    .location
1956                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
1957            },
1958        )
1959    }
1960}
1961
1962impl<'a, K, V, L> Stream<(K, V), Tick<L>, Bounded, TotalOrder, ExactlyOnce>
1963where
1964    K: Eq + Hash,
1965    L: Location<'a>,
1966{
1967    #[deprecated = "use .into_keyed().fold(...) instead"]
1968    /// A special case of [`Stream::fold`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1969    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1970    /// in the second element are accumulated via the `comb` closure.
1971    ///
1972    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1973    /// to depend on the order of elements in the stream.
1974    ///
1975    /// If the input and output value types are the same and do not require initialization then use
1976    /// [`Stream::reduce_keyed`].
1977    ///
1978    /// # Example
1979    /// ```rust
1980    /// # use hydro_lang::prelude::*;
1981    /// # use futures::StreamExt;
1982    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1983    /// let tick = process.tick();
1984    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1985    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1986    /// batch
1987    ///     .fold_keyed(q!(|| 0), q!(|acc, x| *acc += x))
1988    ///     .all_ticks()
1989    /// # }, |mut stream| async move {
1990    /// // (1, 5), (2, 7)
1991    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1992    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1993    /// # }));
1994    /// ```
1995    pub fn fold_keyed<A, I, F>(
1996        self,
1997        init: impl IntoQuotedMut<'a, I, Tick<L>>,
1998        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1999    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2000    where
2001        I: Fn() -> A + 'a,
2002        F: Fn(&mut A, V) + 'a,
2003    {
2004        self.into_keyed().fold(init, comb).entries()
2005    }
2006
2007    #[deprecated = "use .into_keyed().reduce(...) instead"]
2008    /// A special case of [`Stream::reduce`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2009    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2010    /// in the second element are accumulated via the `comb` closure.
2011    ///
2012    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
2013    /// to depend on the order of elements in the stream.
2014    ///
2015    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed`].
2016    ///
2017    /// # Example
2018    /// ```rust
2019    /// # use hydro_lang::prelude::*;
2020    /// # use futures::StreamExt;
2021    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2022    /// let tick = process.tick();
2023    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2024    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2025    /// batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks()
2026    /// # }, |mut stream| async move {
2027    /// // (1, 5), (2, 7)
2028    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2029    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2030    /// # }));
2031    /// ```
2032    pub fn reduce_keyed<F>(
2033        self,
2034        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2035    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2036    where
2037        F: Fn(&mut V, V) + 'a,
2038    {
2039        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
2040
2041        Stream::new(
2042            self.location.clone(),
2043            HydroNode::ReduceKeyed {
2044                f,
2045                input: Box::new(self.ir_node.into_inner()),
2046                metadata: self.location.new_node_metadata(Stream::<
2047                    (K, V),
2048                    Tick<L>,
2049                    Bounded,
2050                    NoOrder,
2051                    ExactlyOnce,
2052                >::collection_kind()),
2053            },
2054        )
2055    }
2056}
2057
2058impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2059where
2060    K: Eq + Hash,
2061    L: Location<'a>,
2062{
2063    #[deprecated = "use .into_keyed().fold_commutative_idempotent(...) instead"]
2064    /// A special case of [`Stream::fold_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2065    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2066    /// in the second element are accumulated via the `comb` closure.
2067    ///
2068    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2069    /// as there may be non-deterministic duplicates.
2070    ///
2071    /// If the input and output value types are the same and do not require initialization then use
2072    /// [`Stream::reduce_keyed_commutative_idempotent`].
2073    ///
2074    /// # Example
2075    /// ```rust
2076    /// # use hydro_lang::prelude::*;
2077    /// # use futures::StreamExt;
2078    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2079    /// let tick = process.tick();
2080    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2081    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2082    /// batch
2083    ///     .fold_keyed_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2084    ///     .all_ticks()
2085    /// # }, |mut stream| async move {
2086    /// // (1, false), (2, true)
2087    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2088    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2089    /// # }));
2090    /// ```
2091    pub fn fold_keyed_commutative_idempotent<A, I, F>(
2092        self,
2093        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2094        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2095    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2096    where
2097        I: Fn() -> A + 'a,
2098        F: Fn(&mut A, V) + 'a,
2099    {
2100        self.into_keyed()
2101            .fold_commutative_idempotent(init, comb)
2102            .entries()
2103    }
2104
2105    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2106    /// # Example
2107    /// ```rust
2108    /// # use hydro_lang::prelude::*;
2109    /// # use futures::StreamExt;
2110    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2111    /// let tick = process.tick();
2112    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2113    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2114    /// batch.keys().all_ticks()
2115    /// # }, |mut stream| async move {
2116    /// // 1, 2
2117    /// # assert_eq!(stream.next().await.unwrap(), 1);
2118    /// # assert_eq!(stream.next().await.unwrap(), 2);
2119    /// # }));
2120    /// ```
2121    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2122        self.into_keyed()
2123            .fold_commutative_idempotent(q!(|| ()), q!(|_, _| {}))
2124            .keys()
2125    }
2126
2127    #[deprecated = "use .into_keyed().reduce_commutative_idempotent(...) instead"]
2128    /// A special case of [`Stream::reduce_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2129    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2130    /// in the second element are accumulated via the `comb` closure.
2131    ///
2132    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2133    /// as there may be non-deterministic duplicates.
2134    ///
2135    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative_idempotent`].
2136    ///
2137    /// # Example
2138    /// ```rust
2139    /// # use hydro_lang::prelude::*;
2140    /// # use futures::StreamExt;
2141    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2142    /// let tick = process.tick();
2143    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2144    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2145    /// batch
2146    ///     .reduce_keyed_commutative_idempotent(q!(|acc, x| *acc |= x))
2147    ///     .all_ticks()
2148    /// # }, |mut stream| async move {
2149    /// // (1, false), (2, true)
2150    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2151    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2152    /// # }));
2153    /// ```
2154    pub fn reduce_keyed_commutative_idempotent<F>(
2155        self,
2156        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2157    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2158    where
2159        F: Fn(&mut V, V) + 'a,
2160    {
2161        self.into_keyed()
2162            .reduce_commutative_idempotent(comb)
2163            .entries()
2164    }
2165}
2166
2167impl<'a, K, V, L, O: Ordering> Stream<(K, V), Tick<L>, Bounded, O, ExactlyOnce>
2168where
2169    K: Eq + Hash,
2170    L: Location<'a>,
2171{
2172    #[deprecated = "use .into_keyed().fold_commutative(...) instead"]
2173    /// A special case of [`Stream::fold_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2174    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2175    /// in the second element are accumulated via the `comb` closure.
2176    ///
2177    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2178    ///
2179    /// If the input and output value types are the same and do not require initialization then use
2180    /// [`Stream::reduce_keyed_commutative`].
2181    ///
2182    /// # Example
2183    /// ```rust
2184    /// # use hydro_lang::prelude::*;
2185    /// # use futures::StreamExt;
2186    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2187    /// let tick = process.tick();
2188    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2189    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2190    /// batch
2191    ///     .fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x))
2192    ///     .all_ticks()
2193    /// # }, |mut stream| async move {
2194    /// // (1, 5), (2, 7)
2195    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2196    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2197    /// # }));
2198    /// ```
2199    pub fn fold_keyed_commutative<A, I, F>(
2200        self,
2201        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2202        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2203    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2204    where
2205        I: Fn() -> A + 'a,
2206        F: Fn(&mut A, V) + 'a,
2207    {
2208        self.into_keyed().fold_commutative(init, comb).entries()
2209    }
2210
2211    #[deprecated = "use .into_keyed().reduce_commutative(...) instead"]
2212    /// A special case of [`Stream::reduce_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2213    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2214    /// in the second element are accumulated via the `comb` closure.
2215    ///
2216    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2217    ///
2218    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative`].
2219    ///
2220    /// # Example
2221    /// ```rust
2222    /// # use hydro_lang::prelude::*;
2223    /// # use futures::StreamExt;
2224    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2225    /// let tick = process.tick();
2226    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2227    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2228    /// batch
2229    ///     .reduce_keyed_commutative(q!(|acc, x| *acc += x))
2230    ///     .all_ticks()
2231    /// # }, |mut stream| async move {
2232    /// // (1, 5), (2, 7)
2233    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2234    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2235    /// # }));
2236    /// ```
2237    pub fn reduce_keyed_commutative<F>(
2238        self,
2239        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2240    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2241    where
2242        F: Fn(&mut V, V) + 'a,
2243    {
2244        self.into_keyed().reduce_commutative(comb).entries()
2245    }
2246}
2247
2248impl<'a, K, V, L, R: Retries> Stream<(K, V), Tick<L>, Bounded, TotalOrder, R>
2249where
2250    K: Eq + Hash,
2251    L: Location<'a>,
2252{
2253    #[deprecated = "use .into_keyed().fold_idempotent(...) instead"]
2254    /// A special case of [`Stream::fold_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2255    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2256    /// in the second element are accumulated via the `comb` closure.
2257    ///
2258    /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
2259    ///
2260    /// If the input and output value types are the same and do not require initialization then use
2261    /// [`Stream::reduce_keyed_idempotent`].
2262    ///
2263    /// # Example
2264    /// ```rust
2265    /// # use hydro_lang::prelude::*;
2266    /// # use futures::StreamExt;
2267    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2268    /// let tick = process.tick();
2269    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2270    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2271    /// batch
2272    ///     .fold_keyed_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2273    ///     .all_ticks()
2274    /// # }, |mut stream| async move {
2275    /// // (1, false), (2, true)
2276    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2277    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2278    /// # }));
2279    /// ```
2280    pub fn fold_keyed_idempotent<A, I, F>(
2281        self,
2282        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2283        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2284    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2285    where
2286        I: Fn() -> A + 'a,
2287        F: Fn(&mut A, V) + 'a,
2288    {
2289        self.into_keyed().fold_idempotent(init, comb).entries()
2290    }
2291
2292    #[deprecated = "use .into_keyed().reduce_idempotent(...) instead"]
2293    /// A special case of [`Stream::reduce_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2294    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2295    /// in the second element are accumulated via the `comb` closure.
2296    ///
2297    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
2298    ///
2299    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_idempotent`].
2300    ///
2301    /// # Example
2302    /// ```rust
2303    /// # use hydro_lang::prelude::*;
2304    /// # use futures::StreamExt;
2305    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2306    /// let tick = process.tick();
2307    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2308    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2309    /// batch
2310    ///     .reduce_keyed_idempotent(q!(|acc, x| *acc |= x))
2311    ///     .all_ticks()
2312    /// # }, |mut stream| async move {
2313    /// // (1, false), (2, true)
2314    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2315    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2316    /// # }));
2317    /// ```
2318    pub fn reduce_keyed_idempotent<F>(
2319        self,
2320        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2321    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2322    where
2323        F: Fn(&mut V, V) + 'a,
2324    {
2325        self.into_keyed().reduce_idempotent(comb).entries()
2326    }
2327}
2328
2329impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2330where
2331    L: Location<'a> + NoTick,
2332{
2333    /// Returns a stream corresponding to the latest batch of elements being atomically
2334    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2335    /// the order of the input.
2336    ///
2337    /// # Non-Determinism
2338    /// The batch boundaries are non-deterministic and may change across executions.
2339    pub fn batch_atomic(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2340        Stream::new(
2341            self.location.clone().tick,
2342            HydroNode::Batch {
2343                inner: Box::new(self.ir_node.into_inner()),
2344                metadata: self
2345                    .location
2346                    .tick
2347                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2348            },
2349        )
2350    }
2351
2352    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2353    /// See [`Stream::atomic`] for more details.
2354    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2355        Stream::new(
2356            self.location.tick.l.clone(),
2357            HydroNode::EndAtomic {
2358                inner: Box::new(self.ir_node.into_inner()),
2359                metadata: self
2360                    .location
2361                    .tick
2362                    .l
2363                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2364            },
2365        )
2366    }
2367}
2368
2369impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2370where
2371    L: Location<'a>,
2372{
2373    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
2374    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
2375    ///
2376    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2377    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2378    /// argument that declares where the stream will be atomically processed. Batching a stream into
2379    /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2380    /// [`Tick`] will introduce asynchrony.
2381    pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
2382        let out_location = Atomic { tick: tick.clone() };
2383        Stream::new(
2384            out_location.clone(),
2385            HydroNode::BeginAtomic {
2386                inner: Box::new(self.ir_node.into_inner()),
2387                metadata: out_location
2388                    .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
2389            },
2390        )
2391    }
2392
2393    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2394    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2395    /// the order of the input. The output stream will execute in the [`Tick`] that was
2396    /// used to create the atomic section.
2397    ///
2398    /// # Non-Determinism
2399    /// The batch boundaries are non-deterministic and may change across executions.
2400    pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2401        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2402        Stream::new(
2403            tick.clone(),
2404            HydroNode::Batch {
2405                inner: Box::new(self.ir_node.into_inner()),
2406                metadata: tick
2407                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2408            },
2409        )
2410    }
2411
2412    /// Given a time interval, returns a stream corresponding to samples taken from the
2413    /// stream roughly at that interval. The output will have elements in the same order
2414    /// as the input, but with arbitrary elements skipped between samples. There is also
2415    /// no guarantee on the exact timing of the samples.
2416    ///
2417    /// # Non-Determinism
2418    /// The output stream is non-deterministic in which elements are sampled, since this
2419    /// is controlled by a clock.
2420    pub fn sample_every(
2421        self,
2422        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
2423        nondet: NonDet,
2424    ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
2425    where
2426        L: NoTick + NoAtomic,
2427    {
2428        let samples = self.location.source_interval(interval, nondet);
2429
2430        let tick = self.location.tick();
2431        self.batch(&tick, nondet)
2432            .filter_if_some(samples.batch(&tick, nondet).first())
2433            .all_ticks()
2434            .weakest_retries()
2435    }
2436
2437    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
2438    /// stream has not emitted a value since that duration.
2439    ///
2440    /// # Non-Determinism
2441    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
2442    /// samples take place, timeouts may be non-deterministically generated or missed,
2443    /// and the notification of the timeout may be delayed as well. There is also no
2444    /// guarantee on how long the [`Optional`] will have a value after the timeout is
2445    /// detected based on when the next sample is taken.
2446    pub fn timeout(
2447        self,
2448        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
2449        nondet: NonDet,
2450    ) -> Optional<(), L, Unbounded>
2451    where
2452        L: NoTick + NoAtomic,
2453    {
2454        let tick = self.location.tick();
2455
2456        let latest_received = self.assume_retries(nondet).fold_commutative(
2457            q!(|| None),
2458            q!(|latest, _| {
2459                *latest = Some(Instant::now());
2460            }),
2461        );
2462
2463        latest_received
2464            .snapshot(&tick, nondet)
2465            .filter_map(q!(move |latest_received| {
2466                if let Some(latest_received) = latest_received {
2467                    if Instant::now().duration_since(latest_received) > duration {
2468                        Some(())
2469                    } else {
2470                        None
2471                    }
2472                } else {
2473                    Some(())
2474                }
2475            }))
2476            .latest()
2477    }
2478}
2479
2480impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2481where
2482    L: Location<'a> + NoTick + NoAtomic,
2483    F: Future<Output = T>,
2484{
2485    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2486    /// Future outputs are produced as available, regardless of input arrival order.
2487    ///
2488    /// # Example
2489    /// ```rust
2490    /// # use std::collections::HashSet;
2491    /// # use futures::StreamExt;
2492    /// # use hydro_lang::prelude::*;
2493    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2494    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2495    ///     .map(q!(|x| async move {
2496    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2497    ///         x
2498    ///     }))
2499    ///     .resolve_futures()
2500    /// #   },
2501    /// #   |mut stream| async move {
2502    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2503    /// #       let mut output = HashSet::new();
2504    /// #       for _ in 1..10 {
2505    /// #           output.insert(stream.next().await.unwrap());
2506    /// #       }
2507    /// #       assert_eq!(
2508    /// #           output,
2509    /// #           HashSet::<i32>::from_iter(1..10)
2510    /// #       );
2511    /// #   },
2512    /// # ));
2513    pub fn resolve_futures(self) -> Stream<T, L, B, NoOrder, R> {
2514        Stream::new(
2515            self.location.clone(),
2516            HydroNode::ResolveFutures {
2517                input: Box::new(self.ir_node.into_inner()),
2518                metadata: self
2519                    .location
2520                    .new_node_metadata(Stream::<T, L, B, NoOrder, R>::collection_kind()),
2521            },
2522        )
2523    }
2524
2525    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2526    /// Future outputs are produced in the same order as the input stream.
2527    ///
2528    /// # Example
2529    /// ```rust
2530    /// # use std::collections::HashSet;
2531    /// # use futures::StreamExt;
2532    /// # use hydro_lang::prelude::*;
2533    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2534    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2535    ///     .map(q!(|x| async move {
2536    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2537    ///         x
2538    ///     }))
2539    ///     .resolve_futures_ordered()
2540    /// #   },
2541    /// #   |mut stream| async move {
2542    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2543    /// #       let mut output = Vec::new();
2544    /// #       for _ in 1..10 {
2545    /// #           output.push(stream.next().await.unwrap());
2546    /// #       }
2547    /// #       assert_eq!(
2548    /// #           output,
2549    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2550    /// #       );
2551    /// #   },
2552    /// # ));
2553    pub fn resolve_futures_ordered(self) -> Stream<T, L, B, O, R> {
2554        Stream::new(
2555            self.location.clone(),
2556            HydroNode::ResolveFuturesOrdered {
2557                input: Box::new(self.ir_node.into_inner()),
2558                metadata: self
2559                    .location
2560                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2561            },
2562        )
2563    }
2564}
2565
2566impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
2567where
2568    L: Location<'a> + NoTick,
2569{
2570    /// Executes the provided closure for every element in this stream.
2571    ///
2572    /// Because the closure may have side effects, the stream must have deterministic order
2573    /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
2574    /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
2575    /// [`Stream::assume_retries`] with an explanation for why this is the case.
2576    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
2577        let f = f.splice_fn1_ctx(&self.location).into();
2578        self.location
2579            .flow_state()
2580            .borrow_mut()
2581            .push_root(HydroRoot::ForEach {
2582                input: Box::new(self.ir_node.into_inner()),
2583                f,
2584                op_metadata: HydroIrOpMetadata::new(),
2585            });
2586    }
2587
2588    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
2589    /// TCP socket to some other server. You should _not_ use this API for interacting with
2590    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
2591    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
2592    /// interaction with asynchronous sinks.
2593    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
2594    where
2595        S: 'a + futures::Sink<T> + Unpin,
2596    {
2597        self.location
2598            .flow_state()
2599            .borrow_mut()
2600            .push_root(HydroRoot::DestSink {
2601                sink: sink.splice_typed_ctx(&self.location).into(),
2602                input: Box::new(self.ir_node.into_inner()),
2603                op_metadata: HydroIrOpMetadata::new(),
2604            });
2605    }
2606}
2607
2608impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2609where
2610    L: Location<'a>,
2611{
2612    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2613    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2614    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2615        Stream::new(
2616            self.location.outer().clone(),
2617            HydroNode::YieldConcat {
2618                inner: Box::new(self.ir_node.into_inner()),
2619                metadata: self
2620                    .location
2621                    .outer()
2622                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2623            },
2624        )
2625    }
2626
2627    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2628    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2629    ///
2630    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2631    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2632    /// stream's [`Tick`] context.
2633    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2634        let out_location = Atomic {
2635            tick: self.location.clone(),
2636        };
2637
2638        Stream::new(
2639            out_location.clone(),
2640            HydroNode::YieldConcat {
2641                inner: Box::new(self.ir_node.into_inner()),
2642                metadata: out_location
2643                    .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2644            },
2645        )
2646    }
2647
2648    /// Accumulates the elements of this stream **across ticks** by concatenating them together.
2649    ///
2650    /// The output stream in tick T will contain the elements of the input at tick 0, 1, ..., up to
2651    /// and including tick T. This is useful for accumulating streaming inputs across ticks, but be
2652    /// careful when using this operator, as its memory usage will grow linearly over time since it
2653    /// must store its inputs indefinitely.
2654    ///
2655    /// # Example
2656    /// ```rust
2657    /// # use hydro_lang::prelude::*;
2658    /// # use futures::StreamExt;
2659    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2660    /// let tick = process.tick();
2661    /// // ticks are lazy by default, forces the second tick to run
2662    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2663    ///
2664    /// let batch_first_tick = process
2665    ///   .source_iter(q!(vec![1, 2, 3, 4]))
2666    ///   .batch(&tick, nondet!(/** test */));
2667    /// let batch_second_tick = process
2668    ///   .source_iter(q!(vec![5, 6, 7, 8]))
2669    ///   .batch(&tick, nondet!(/** test */))
2670    ///   .defer_tick(); // appears on the second tick
2671    /// batch_first_tick.chain(batch_second_tick)
2672    ///   .persist()
2673    ///   .all_ticks()
2674    /// # }, |mut stream| async move {
2675    /// // [1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, ...]
2676    /// # for w in vec![1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8] {
2677    /// #     assert_eq!(stream.next().await.unwrap(), w);
2678    /// # }
2679    /// # }));
2680    /// ```
2681    pub fn persist(self) -> Stream<T, Tick<L>, Bounded, O, R>
2682    where
2683        T: Clone,
2684    {
2685        Stream::new(
2686            self.location.clone(),
2687            HydroNode::Persist {
2688                inner: Box::new(self.ir_node.into_inner()),
2689                metadata: self
2690                    .location
2691                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2692            },
2693        )
2694    }
2695
2696    /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2697    /// always has the elements of `self` at tick `T - 1`.
2698    ///
2699    /// At tick `0`, the output stream is empty, since there is no previous tick.
2700    ///
2701    /// This operator enables stateful iterative processing with ticks, by sending data from one
2702    /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2703    ///
2704    /// # Example
2705    /// ```rust
2706    /// # use hydro_lang::prelude::*;
2707    /// # use futures::StreamExt;
2708    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2709    /// let tick = process.tick();
2710    /// // ticks are lazy by default, forces the second tick to run
2711    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2712    ///
2713    /// let batch_first_tick = process
2714    ///   .source_iter(q!(vec![1, 2, 3, 4]))
2715    ///   .batch(&tick, nondet!(/** test */));
2716    /// let batch_second_tick = process
2717    ///   .source_iter(q!(vec![0, 3, 4, 5, 6]))
2718    ///   .batch(&tick, nondet!(/** test */))
2719    ///   .defer_tick(); // appears on the second tick
2720    /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2721    ///
2722    /// changes_across_ticks.clone().filter_not_in(
2723    ///     changes_across_ticks.defer_tick() // the elements from the previous tick
2724    /// ).all_ticks()
2725    /// # }, |mut stream| async move {
2726    /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2727    /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2728    /// #     assert_eq!(stream.next().await.unwrap(), w);
2729    /// # }
2730    /// # }));
2731    /// ```
2732    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2733        Stream::new(
2734            self.location.clone(),
2735            HydroNode::DeferTick {
2736                input: Box::new(self.ir_node.into_inner()),
2737                metadata: self
2738                    .location
2739                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2740            },
2741        )
2742    }
2743}
2744
2745#[cfg(test)]
2746mod tests {
2747    use futures::{SinkExt, StreamExt};
2748    use hydro_deploy::Deployment;
2749    use serde::{Deserialize, Serialize};
2750    use stageleft::q;
2751
2752    use crate::compile::builder::FlowBuilder;
2753    use crate::live_collections::stream::{ExactlyOnce, NoOrder, TotalOrder};
2754    use crate::location::Location;
2755    use crate::nondet::nondet;
2756
2757    mod backtrace_chained_ops;
2758
2759    struct P1 {}
2760    struct P2 {}
2761
2762    #[derive(Serialize, Deserialize, Debug)]
2763    struct SendOverNetwork {
2764        n: u32,
2765    }
2766
2767    #[tokio::test]
2768    async fn first_ten_distributed() {
2769        let mut deployment = Deployment::new();
2770
2771        let flow = FlowBuilder::new();
2772        let first_node = flow.process::<P1>();
2773        let second_node = flow.process::<P2>();
2774        let external = flow.external::<P2>();
2775
2776        let numbers = first_node.source_iter(q!(0..10));
2777        let out_port = numbers
2778            .map(q!(|n| SendOverNetwork { n }))
2779            .send_bincode(&second_node)
2780            .send_bincode_external(&external);
2781
2782        let nodes = flow
2783            .with_process(&first_node, deployment.Localhost())
2784            .with_process(&second_node, deployment.Localhost())
2785            .with_external(&external, deployment.Localhost())
2786            .deploy(&mut deployment);
2787
2788        deployment.deploy().await.unwrap();
2789
2790        let mut external_out = nodes.connect(out_port).await;
2791
2792        deployment.start().await.unwrap();
2793
2794        for i in 0..10 {
2795            assert_eq!(external_out.next().await.unwrap().n, i);
2796        }
2797    }
2798
2799    #[tokio::test]
2800    async fn first_cardinality() {
2801        let mut deployment = Deployment::new();
2802
2803        let flow = FlowBuilder::new();
2804        let node = flow.process::<()>();
2805        let external = flow.external::<()>();
2806
2807        let node_tick = node.tick();
2808        let count = node_tick
2809            .singleton(q!([1, 2, 3]))
2810            .into_stream()
2811            .flatten_ordered()
2812            .first()
2813            .into_stream()
2814            .count()
2815            .all_ticks()
2816            .send_bincode_external(&external);
2817
2818        let nodes = flow
2819            .with_process(&node, deployment.Localhost())
2820            .with_external(&external, deployment.Localhost())
2821            .deploy(&mut deployment);
2822
2823        deployment.deploy().await.unwrap();
2824
2825        let mut external_out = nodes.connect(count).await;
2826
2827        deployment.start().await.unwrap();
2828
2829        assert_eq!(external_out.next().await.unwrap(), 1);
2830    }
2831
2832    #[tokio::test]
2833    async fn unbounded_reduce_remembers_state() {
2834        let mut deployment = Deployment::new();
2835
2836        let flow = FlowBuilder::new();
2837        let node = flow.process::<()>();
2838        let external = flow.external::<()>();
2839
2840        let (input_port, input) = node.source_external_bincode(&external);
2841        let out = input
2842            .reduce(q!(|acc, v| *acc += v))
2843            .sample_eager(nondet!(/** test */))
2844            .send_bincode_external(&external);
2845
2846        let nodes = flow
2847            .with_process(&node, deployment.Localhost())
2848            .with_external(&external, deployment.Localhost())
2849            .deploy(&mut deployment);
2850
2851        deployment.deploy().await.unwrap();
2852
2853        let mut external_in = nodes.connect(input_port).await;
2854        let mut external_out = nodes.connect(out).await;
2855
2856        deployment.start().await.unwrap();
2857
2858        external_in.send(1).await.unwrap();
2859        assert_eq!(external_out.next().await.unwrap(), 1);
2860
2861        external_in.send(2).await.unwrap();
2862        assert_eq!(external_out.next().await.unwrap(), 3);
2863    }
2864
2865    #[tokio::test]
2866    async fn atomic_fold_replays_each_tick() {
2867        let mut deployment = Deployment::new();
2868
2869        let flow = FlowBuilder::new();
2870        let node = flow.process::<()>();
2871        let external = flow.external::<()>();
2872
2873        let (input_port, input) =
2874            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2875        let tick = node.tick();
2876
2877        let out = input
2878            .batch(&tick, nondet!(/** test */))
2879            .cross_singleton(
2880                node.source_iter(q!(vec![1, 2, 3]))
2881                    .atomic(&tick)
2882                    .fold(q!(|| 0), q!(|acc, v| *acc += v))
2883                    .snapshot_atomic(nondet!(/** test */)),
2884            )
2885            .all_ticks()
2886            .send_bincode_external(&external);
2887
2888        let nodes = flow
2889            .with_process(&node, deployment.Localhost())
2890            .with_external(&external, deployment.Localhost())
2891            .deploy(&mut deployment);
2892
2893        deployment.deploy().await.unwrap();
2894
2895        let mut external_in = nodes.connect(input_port).await;
2896        let mut external_out = nodes.connect(out).await;
2897
2898        deployment.start().await.unwrap();
2899
2900        external_in.send(1).await.unwrap();
2901        assert_eq!(external_out.next().await.unwrap(), (1, 6));
2902
2903        external_in.send(2).await.unwrap();
2904        assert_eq!(external_out.next().await.unwrap(), (2, 6));
2905    }
2906
2907    #[tokio::test]
2908    async fn unbounded_scan_remembers_state() {
2909        let mut deployment = Deployment::new();
2910
2911        let flow = FlowBuilder::new();
2912        let node = flow.process::<()>();
2913        let external = flow.external::<()>();
2914
2915        let (input_port, input) = node.source_external_bincode(&external);
2916        let out = input
2917            .scan(
2918                q!(|| 0),
2919                q!(|acc, v| {
2920                    *acc += v;
2921                    Some(*acc)
2922                }),
2923            )
2924            .send_bincode_external(&external);
2925
2926        let nodes = flow
2927            .with_process(&node, deployment.Localhost())
2928            .with_external(&external, deployment.Localhost())
2929            .deploy(&mut deployment);
2930
2931        deployment.deploy().await.unwrap();
2932
2933        let mut external_in = nodes.connect(input_port).await;
2934        let mut external_out = nodes.connect(out).await;
2935
2936        deployment.start().await.unwrap();
2937
2938        external_in.send(1).await.unwrap();
2939        assert_eq!(external_out.next().await.unwrap(), 1);
2940
2941        external_in.send(2).await.unwrap();
2942        assert_eq!(external_out.next().await.unwrap(), 3);
2943    }
2944
2945    #[tokio::test]
2946    async fn unbounded_enumerate_remembers_state() {
2947        let mut deployment = Deployment::new();
2948
2949        let flow = FlowBuilder::new();
2950        let node = flow.process::<()>();
2951        let external = flow.external::<()>();
2952
2953        let (input_port, input) = node.source_external_bincode(&external);
2954        let out = input.enumerate().send_bincode_external(&external);
2955
2956        let nodes = flow
2957            .with_process(&node, deployment.Localhost())
2958            .with_external(&external, deployment.Localhost())
2959            .deploy(&mut deployment);
2960
2961        deployment.deploy().await.unwrap();
2962
2963        let mut external_in = nodes.connect(input_port).await;
2964        let mut external_out = nodes.connect(out).await;
2965
2966        deployment.start().await.unwrap();
2967
2968        external_in.send(1).await.unwrap();
2969        assert_eq!(external_out.next().await.unwrap(), (0, 1));
2970
2971        external_in.send(2).await.unwrap();
2972        assert_eq!(external_out.next().await.unwrap(), (1, 2));
2973    }
2974
2975    #[tokio::test]
2976    async fn unbounded_unique_remembers_state() {
2977        let mut deployment = Deployment::new();
2978
2979        let flow = FlowBuilder::new();
2980        let node = flow.process::<()>();
2981        let external = flow.external::<()>();
2982
2983        let (input_port, input) =
2984            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2985        let out = input.unique().send_bincode_external(&external);
2986
2987        let nodes = flow
2988            .with_process(&node, deployment.Localhost())
2989            .with_external(&external, deployment.Localhost())
2990            .deploy(&mut deployment);
2991
2992        deployment.deploy().await.unwrap();
2993
2994        let mut external_in = nodes.connect(input_port).await;
2995        let mut external_out = nodes.connect(out).await;
2996
2997        deployment.start().await.unwrap();
2998
2999        external_in.send(1).await.unwrap();
3000        assert_eq!(external_out.next().await.unwrap(), 1);
3001
3002        external_in.send(2).await.unwrap();
3003        assert_eq!(external_out.next().await.unwrap(), 2);
3004
3005        external_in.send(1).await.unwrap();
3006        external_in.send(3).await.unwrap();
3007        assert_eq!(external_out.next().await.unwrap(), 3);
3008    }
3009
3010    #[test]
3011    #[should_panic]
3012    fn sim_batch_nondet_size() {
3013        let flow = FlowBuilder::new();
3014        let external = flow.external::<()>();
3015        let node = flow.process::<()>();
3016
3017        let (port, input) = node.source_external_bincode::<_, _, TotalOrder, _>(&external);
3018
3019        let tick = node.tick();
3020        let out_port = input
3021            .batch(&tick, nondet!(/** test */))
3022            .count()
3023            .all_ticks()
3024            .send_bincode_external(&external);
3025
3026        flow.sim().exhaustive(async |mut compiled| {
3027            let in_send = compiled.connect(&port);
3028            let mut out_recv = compiled.connect(&out_port);
3029            compiled.launch();
3030
3031            in_send.send(()).unwrap();
3032            in_send.send(()).unwrap();
3033            in_send.send(()).unwrap();
3034
3035            assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3036        });
3037    }
3038
3039    #[test]
3040    fn sim_batch_preserves_order() {
3041        let flow = FlowBuilder::new();
3042        let external = flow.external::<()>();
3043        let node = flow.process::<()>();
3044
3045        let (port, input) = node.source_external_bincode(&external);
3046
3047        let tick = node.tick();
3048        let out_port = input
3049            .batch(&tick, nondet!(/** test */))
3050            .all_ticks()
3051            .send_bincode_external(&external);
3052
3053        flow.sim().exhaustive(async |mut compiled| {
3054            let in_send = compiled.connect(&port);
3055            let out_recv = compiled.connect(&out_port);
3056            compiled.launch();
3057
3058            in_send.send(1).unwrap();
3059            in_send.send(2).unwrap();
3060            in_send.send(3).unwrap();
3061
3062            out_recv.assert_yields_only([1, 2, 3]).await;
3063        });
3064    }
3065
3066    #[test]
3067    fn sim_batch_unordered_shuffles_count() {
3068        let flow = FlowBuilder::new();
3069        let external = flow.external::<()>();
3070        let node = flow.process::<()>();
3071
3072        let (port, input) = node.source_external_bincode::<_, _, NoOrder, _>(&external);
3073
3074        let tick = node.tick();
3075        let batch = input.batch(&tick, nondet!(/** test */));
3076        let out_port = batch.all_ticks().send_bincode_external(&external);
3077
3078        let instance_count = flow.sim().exhaustive(async |mut compiled| {
3079            let in_send = compiled.connect(&port);
3080            let out_recv = compiled.connect(&out_port);
3081            compiled.launch();
3082
3083            in_send.send_many_unordered([1, 2, 3, 4]).unwrap();
3084            out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3085        });
3086
3087        assert_eq!(
3088            instance_count,
3089            75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3090        )
3091    }
3092}