hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11use syn::parse_quote;
12use tokio::time::Instant;
13
14use super::boundedness::{Bounded, Boundedness, Unbounded};
15use super::keyed_singleton::KeyedSingleton;
16use super::keyed_stream::KeyedStream;
17use super::optional::Optional;
18use super::singleton::Singleton;
19use crate::compile::ir::{HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
20#[cfg(stageleft_runtime)]
21use crate::forward_handle::{CycleCollection, ReceiverComplete};
22use crate::forward_handle::{ForwardRef, TickCycle};
23#[cfg(stageleft_runtime)]
24use crate::location::dynamic::{DynLocation, LocationId};
25use crate::location::tick::{Atomic, DeferTick, NoAtomic};
26use crate::location::{Location, NoTick, Tick, check_matching_location};
27use crate::nondet::{NonDet, nondet};
28
29pub mod networking;
30
31/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
32#[sealed::sealed]
33pub trait Ordering:
34    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
35{
36}
37
38/// Marks the stream as being totally ordered, which means that there are
39/// no sources of non-determinism (other than intentional ones) that will
40/// affect the order of elements.
41pub enum TotalOrder {}
42
43#[sealed::sealed]
44impl Ordering for TotalOrder {}
45
46/// Marks the stream as having no order, which means that the order of
47/// elements may be affected by non-determinism.
48///
49/// This restricts certain operators, such as `fold` and `reduce`, to only
50/// be used with commutative aggregation functions.
51pub enum NoOrder {}
52
53#[sealed::sealed]
54impl Ordering for NoOrder {}
55
56/// Helper trait for determining the weakest of two orderings.
57#[sealed::sealed]
58pub trait MinOrder<Other: ?Sized> {
59    /// The weaker of the two orderings.
60    type Min: Ordering;
61}
62
63#[sealed::sealed]
64impl MinOrder<NoOrder> for TotalOrder {
65    type Min = NoOrder;
66}
67
68#[sealed::sealed]
69impl MinOrder<TotalOrder> for TotalOrder {
70    type Min = TotalOrder;
71}
72
73#[sealed::sealed]
74impl MinOrder<TotalOrder> for NoOrder {
75    type Min = NoOrder;
76}
77
78#[sealed::sealed]
79impl MinOrder<NoOrder> for NoOrder {
80    type Min = NoOrder;
81}
82
83/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
84#[sealed::sealed]
85pub trait Retries:
86    MinRetries<Self, Min = Self>
87    + MinRetries<ExactlyOnce, Min = Self>
88    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
89{
90}
91
92/// Marks the stream as having deterministic message cardinality, with no
93/// possibility of duplicates.
94pub enum ExactlyOnce {}
95
96#[sealed::sealed]
97impl Retries for ExactlyOnce {}
98
99/// Marks the stream as having non-deterministic message cardinality, which
100/// means that duplicates may occur, but messages will not be dropped.
101pub enum AtLeastOnce {}
102
103#[sealed::sealed]
104impl Retries for AtLeastOnce {}
105
106/// Helper trait for determining the weakest of two retry guarantees.
107#[sealed::sealed]
108pub trait MinRetries<Other: ?Sized> {
109    /// The weaker of the two retry guarantees.
110    type Min: Retries;
111}
112
113#[sealed::sealed]
114impl MinRetries<AtLeastOnce> for ExactlyOnce {
115    type Min = AtLeastOnce;
116}
117
118#[sealed::sealed]
119impl MinRetries<ExactlyOnce> for ExactlyOnce {
120    type Min = ExactlyOnce;
121}
122
123#[sealed::sealed]
124impl MinRetries<ExactlyOnce> for AtLeastOnce {
125    type Min = AtLeastOnce;
126}
127
128#[sealed::sealed]
129impl MinRetries<AtLeastOnce> for AtLeastOnce {
130    type Min = AtLeastOnce;
131}
132
133/// Streaming sequence of elements with type `Type`.
134///
135/// This live collection represents a growing sequence of elements, with new elements being
136/// asynchronously appended to the end of the sequence. This can be used to model the arrival
137/// of network input, such as API requests, or streaming ingestion.
138///
139/// By default, all streams have deterministic ordering and each element is materialized exactly
140/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
141/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
142/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
143///
144/// Type Parameters:
145/// - `Type`: the type of elements in the stream
146/// - `Loc`: the location where the stream is being materialized
147/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
148/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
149///   (default is [`TotalOrder`])
150/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
151///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
152pub struct Stream<
153    Type,
154    Loc,
155    Bound: Boundedness,
156    Order: Ordering = TotalOrder,
157    Retry: Retries = ExactlyOnce,
158> {
159    pub(crate) location: Loc,
160    pub(crate) ir_node: RefCell<HydroNode>,
161
162    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
163}
164
165impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
166    for Stream<T, L, Unbounded, O, R>
167where
168    L: Location<'a>,
169{
170    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
171        Stream {
172            location: stream.location,
173            ir_node: stream.ir_node,
174            _phantom: PhantomData,
175        }
176    }
177}
178
179impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
180    for Stream<T, L, B, NoOrder, R>
181where
182    L: Location<'a>,
183{
184    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
185        Stream {
186            location: stream.location,
187            ir_node: stream.ir_node,
188            _phantom: PhantomData,
189        }
190    }
191}
192
193impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
194    for Stream<T, L, B, O, AtLeastOnce>
195where
196    L: Location<'a>,
197{
198    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
199        Stream {
200            location: stream.location,
201            ir_node: stream.ir_node,
202            _phantom: PhantomData,
203        }
204    }
205}
206
207impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
208where
209    L: Location<'a>,
210{
211    fn defer_tick(self) -> Self {
212        Stream::defer_tick(self)
213    }
214}
215
216impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
217    for Stream<T, Tick<L>, Bounded, O, R>
218where
219    L: Location<'a>,
220{
221    type Location = Tick<L>;
222
223    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
224        Stream::new(
225            location.clone(),
226            HydroNode::CycleSource {
227                ident,
228                metadata: location.new_node_metadata::<T>(),
229            },
230        )
231    }
232}
233
234impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
235    for Stream<T, Tick<L>, Bounded, O, R>
236where
237    L: Location<'a>,
238{
239    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
240        assert_eq!(
241            Location::id(&self.location),
242            expected_location,
243            "locations do not match"
244        );
245        self.location
246            .flow_state()
247            .borrow_mut()
248            .push_root(HydroRoot::CycleSink {
249                ident,
250                input: Box::new(self.ir_node.into_inner()),
251                out_location: Location::id(&self.location),
252                op_metadata: HydroIrOpMetadata::new(),
253            });
254    }
255}
256
257impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
258    for Stream<T, L, B, O, R>
259where
260    L: Location<'a> + NoTick,
261{
262    type Location = L;
263
264    fn create_source(ident: syn::Ident, location: L) -> Self {
265        Stream::new(
266            location.clone(),
267            HydroNode::Persist {
268                inner: Box::new(HydroNode::CycleSource {
269                    ident,
270                    metadata: location.new_node_metadata::<T>(),
271                }),
272                metadata: location.new_node_metadata::<T>(),
273            },
274        )
275    }
276}
277
278impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
279    for Stream<T, L, B, O, R>
280where
281    L: Location<'a> + NoTick,
282{
283    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
284        assert_eq!(
285            Location::id(&self.location),
286            expected_location,
287            "locations do not match"
288        );
289        let metadata = self.location.new_node_metadata::<T>();
290        self.location
291            .flow_state()
292            .borrow_mut()
293            .push_root(HydroRoot::CycleSink {
294                ident,
295                input: Box::new(HydroNode::Unpersist {
296                    inner: Box::new(self.ir_node.into_inner()),
297                    metadata: metadata.clone(),
298                }),
299                out_location: Location::id(&self.location),
300                op_metadata: HydroIrOpMetadata::new(),
301            });
302    }
303}
304
305impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
306where
307    T: Clone,
308    L: Location<'a>,
309{
310    fn clone(&self) -> Self {
311        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
312            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
313            *self.ir_node.borrow_mut() = HydroNode::Tee {
314                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
315                metadata: self.location.new_node_metadata::<T>(),
316            };
317        }
318
319        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
320            Stream {
321                location: self.location.clone(),
322                ir_node: HydroNode::Tee {
323                    inner: TeeNode(inner.0.clone()),
324                    metadata: metadata.clone(),
325                }
326                .into(),
327                _phantom: PhantomData,
328            }
329        } else {
330            unreachable!()
331        }
332    }
333}
334
335impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
336where
337    L: Location<'a>,
338{
339    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
340        Stream {
341            location,
342            ir_node: RefCell::new(ir_node),
343            _phantom: PhantomData,
344        }
345    }
346
347    /// Produces a stream based on invoking `f` on each element.
348    /// If you do not want to modify the stream and instead only want to view
349    /// each item use [`Stream::inspect`] instead.
350    ///
351    /// # Example
352    /// ```rust
353    /// # use hydro_lang::prelude::*;
354    /// # use futures::StreamExt;
355    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
356    /// let words = process.source_iter(q!(vec!["hello", "world"]));
357    /// words.map(q!(|x| x.to_uppercase()))
358    /// # }, |mut stream| async move {
359    /// # for w in vec!["HELLO", "WORLD"] {
360    /// #     assert_eq!(stream.next().await.unwrap(), w);
361    /// # }
362    /// # }));
363    /// ```
364    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
365    where
366        F: Fn(T) -> U + 'a,
367    {
368        let f = f.splice_fn1_ctx(&self.location).into();
369        Stream::new(
370            self.location.clone(),
371            HydroNode::Map {
372                f,
373                input: Box::new(self.ir_node.into_inner()),
374                metadata: self.location.new_node_metadata::<U>(),
375            },
376        )
377    }
378
379    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
380    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
381    /// for the output type `U` must produce items in a **deterministic** order.
382    ///
383    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
384    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
385    ///
386    /// # Example
387    /// ```rust
388    /// # use hydro_lang::prelude::*;
389    /// # use futures::StreamExt;
390    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
391    /// process
392    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
393    ///     .flat_map_ordered(q!(|x| x))
394    /// # }, |mut stream| async move {
395    /// // 1, 2, 3, 4
396    /// # for w in (1..5) {
397    /// #     assert_eq!(stream.next().await.unwrap(), w);
398    /// # }
399    /// # }));
400    /// ```
401    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
402    where
403        I: IntoIterator<Item = U>,
404        F: Fn(T) -> I + 'a,
405    {
406        let f = f.splice_fn1_ctx(&self.location).into();
407        Stream::new(
408            self.location.clone(),
409            HydroNode::FlatMap {
410                f,
411                input: Box::new(self.ir_node.into_inner()),
412                metadata: self.location.new_node_metadata::<U>(),
413            },
414        )
415    }
416
417    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
418    /// for the output type `U` to produce items in any order.
419    ///
420    /// # Example
421    /// ```rust
422    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
423    /// # use futures::StreamExt;
424    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
425    /// process
426    ///     .source_iter(q!(vec![
427    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
428    ///         std::collections::HashSet::from_iter(vec![3, 4]),
429    ///     ]))
430    ///     .flat_map_unordered(q!(|x| x))
431    /// # }, |mut stream| async move {
432    /// // 1, 2, 3, 4, but in no particular order
433    /// # let mut results = Vec::new();
434    /// # for w in (1..5) {
435    /// #     results.push(stream.next().await.unwrap());
436    /// # }
437    /// # results.sort();
438    /// # assert_eq!(results, vec![1, 2, 3, 4]);
439    /// # }));
440    /// ```
441    pub fn flat_map_unordered<U, I, F>(
442        self,
443        f: impl IntoQuotedMut<'a, F, L>,
444    ) -> Stream<U, L, B, NoOrder, R>
445    where
446        I: IntoIterator<Item = U>,
447        F: Fn(T) -> I + 'a,
448    {
449        let f = f.splice_fn1_ctx(&self.location).into();
450        Stream::new(
451            self.location.clone(),
452            HydroNode::FlatMap {
453                f,
454                input: Box::new(self.ir_node.into_inner()),
455                metadata: self.location.new_node_metadata::<U>(),
456            },
457        )
458    }
459
460    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
461    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
462    ///
463    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
464    /// not deterministic, use [`Stream::flatten_unordered`] instead.
465    ///
466    /// ```rust
467    /// # use hydro_lang::prelude::*;
468    /// # use futures::StreamExt;
469    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
470    /// process
471    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
472    ///     .flatten_ordered()
473    /// # }, |mut stream| async move {
474    /// // 1, 2, 3, 4
475    /// # for w in (1..5) {
476    /// #     assert_eq!(stream.next().await.unwrap(), w);
477    /// # }
478    /// # }));
479    /// ```
480    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
481    where
482        T: IntoIterator<Item = U>,
483    {
484        self.flat_map_ordered(q!(|d| d))
485    }
486
487    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
488    /// for the element type `T` to produce items in any order.
489    ///
490    /// # Example
491    /// ```rust
492    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
493    /// # use futures::StreamExt;
494    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
495    /// process
496    ///     .source_iter(q!(vec![
497    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
498    ///         std::collections::HashSet::from_iter(vec![3, 4]),
499    ///     ]))
500    ///     .flatten_unordered()
501    /// # }, |mut stream| async move {
502    /// // 1, 2, 3, 4, but in no particular order
503    /// # let mut results = Vec::new();
504    /// # for w in (1..5) {
505    /// #     results.push(stream.next().await.unwrap());
506    /// # }
507    /// # results.sort();
508    /// # assert_eq!(results, vec![1, 2, 3, 4]);
509    /// # }));
510    /// ```
511    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
512    where
513        T: IntoIterator<Item = U>,
514    {
515        self.flat_map_unordered(q!(|d| d))
516    }
517
518    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
519    /// `f`, preserving the order of the elements.
520    ///
521    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
522    /// not modify or take ownership of the values. If you need to modify the values while filtering
523    /// use [`Stream::filter_map`] instead.
524    ///
525    /// # Example
526    /// ```rust
527    /// # use hydro_lang::prelude::*;
528    /// # use futures::StreamExt;
529    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
530    /// process
531    ///     .source_iter(q!(vec![1, 2, 3, 4]))
532    ///     .filter(q!(|&x| x > 2))
533    /// # }, |mut stream| async move {
534    /// // 3, 4
535    /// # for w in (3..5) {
536    /// #     assert_eq!(stream.next().await.unwrap(), w);
537    /// # }
538    /// # }));
539    /// ```
540    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<T, L, B, O, R>
541    where
542        F: Fn(&T) -> bool + 'a,
543    {
544        let f = f.splice_fn1_borrow_ctx(&self.location).into();
545        Stream::new(
546            self.location.clone(),
547            HydroNode::Filter {
548                f,
549                input: Box::new(self.ir_node.into_inner()),
550                metadata: self.location.new_node_metadata::<T>(),
551            },
552        )
553    }
554
555    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
556    ///
557    /// # Example
558    /// ```rust
559    /// # use hydro_lang::prelude::*;
560    /// # use futures::StreamExt;
561    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
562    /// process
563    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
564    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
565    /// # }, |mut stream| async move {
566    /// // 1, 2
567    /// # for w in (1..3) {
568    /// #     assert_eq!(stream.next().await.unwrap(), w);
569    /// # }
570    /// # }));
571    /// ```
572    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
573    where
574        F: Fn(T) -> Option<U> + 'a,
575    {
576        let f = f.splice_fn1_ctx(&self.location).into();
577        Stream::new(
578            self.location.clone(),
579            HydroNode::FilterMap {
580                f,
581                input: Box::new(self.ir_node.into_inner()),
582                metadata: self.location.new_node_metadata::<U>(),
583            },
584        )
585    }
586
587    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
588    /// where `x` is the final value of `other`, a bounded [`Singleton`].
589    ///
590    /// # Example
591    /// ```rust
592    /// # use hydro_lang::prelude::*;
593    /// # use futures::StreamExt;
594    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
595    /// let tick = process.tick();
596    /// let batch = process
597    ///   .source_iter(q!(vec![1, 2, 3, 4]))
598    ///   .batch(&tick, nondet!(/** test */));
599    /// let count = batch.clone().count(); // `count()` returns a singleton
600    /// batch.cross_singleton(count).all_ticks()
601    /// # }, |mut stream| async move {
602    /// // (1, 4), (2, 4), (3, 4), (4, 4)
603    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
604    /// #     assert_eq!(stream.next().await.unwrap(), w);
605    /// # }
606    /// # }));
607    /// ```
608    pub fn cross_singleton<O2>(
609        self,
610        other: impl Into<Optional<O2, L, Bounded>>,
611    ) -> Stream<(T, O2), L, B, O, R>
612    where
613        O2: Clone,
614    {
615        let other: Optional<O2, L, Bounded> = other.into();
616        check_matching_location(&self.location, &other.location);
617
618        Stream::new(
619            self.location.clone(),
620            HydroNode::CrossSingleton {
621                left: Box::new(self.ir_node.into_inner()),
622                right: Box::new(other.ir_node.into_inner()),
623                metadata: self.location.new_node_metadata::<(T, O2)>(),
624            },
625        )
626    }
627
628    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
629    ///
630    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
631    /// leader of a cluster.
632    ///
633    /// # Example
634    /// ```rust
635    /// # use hydro_lang::prelude::*;
636    /// # use futures::StreamExt;
637    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
638    /// let tick = process.tick();
639    /// // ticks are lazy by default, forces the second tick to run
640    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
641    ///
642    /// let batch_first_tick = process
643    ///   .source_iter(q!(vec![1, 2, 3, 4]))
644    ///   .batch(&tick, nondet!(/** test */));
645    /// let batch_second_tick = process
646    ///   .source_iter(q!(vec![5, 6, 7, 8]))
647    ///   .batch(&tick, nondet!(/** test */))
648    ///   .defer_tick(); // appears on the second tick
649    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
650    /// batch_first_tick.chain(batch_second_tick)
651    ///   .filter_if_some(some_on_first_tick)
652    ///   .all_ticks()
653    /// # }, |mut stream| async move {
654    /// // [1, 2, 3, 4]
655    /// # for w in vec![1, 2, 3, 4] {
656    /// #     assert_eq!(stream.next().await.unwrap(), w);
657    /// # }
658    /// # }));
659    /// ```
660    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
661        self.cross_singleton(signal.map(q!(|_u| ())))
662            .map(q!(|(d, _signal)| d))
663    }
664
665    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
666    ///
667    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
668    /// some local state.
669    ///
670    /// # Example
671    /// ```rust
672    /// # use hydro_lang::prelude::*;
673    /// # use futures::StreamExt;
674    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
675    /// let tick = process.tick();
676    /// // ticks are lazy by default, forces the second tick to run
677    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
678    ///
679    /// let batch_first_tick = process
680    ///   .source_iter(q!(vec![1, 2, 3, 4]))
681    ///   .batch(&tick, nondet!(/** test */));
682    /// let batch_second_tick = process
683    ///   .source_iter(q!(vec![5, 6, 7, 8]))
684    ///   .batch(&tick, nondet!(/** test */))
685    ///   .defer_tick(); // appears on the second tick
686    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
687    /// batch_first_tick.chain(batch_second_tick)
688    ///   .filter_if_none(some_on_first_tick)
689    ///   .all_ticks()
690    /// # }, |mut stream| async move {
691    /// // [5, 6, 7, 8]
692    /// # for w in vec![5, 6, 7, 8] {
693    /// #     assert_eq!(stream.next().await.unwrap(), w);
694    /// # }
695    /// # }));
696    /// ```
697    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
698        self.filter_if_some(
699            other
700                .map(q!(|_| ()))
701                .into_singleton()
702                .filter(q!(|o| o.is_none())),
703        )
704    }
705
706    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
707    /// tupled pairs in a non-deterministic order.
708    ///
709    /// # Example
710    /// ```rust
711    /// # use hydro_lang::prelude::*;
712    /// # use std::collections::HashSet;
713    /// # use futures::StreamExt;
714    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
715    /// let tick = process.tick();
716    /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
717    /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
718    /// stream1.cross_product(stream2)
719    /// # }, |mut stream| async move {
720    /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
721    /// # stream.map(|i| assert!(expected.contains(&i)));
722    /// # }));
723    /// ```
724    pub fn cross_product<T2, O2: Ordering>(
725        self,
726        other: Stream<T2, L, B, O2, R>,
727    ) -> Stream<(T, T2), L, B, NoOrder, R>
728    where
729        T: Clone,
730        T2: Clone,
731    {
732        check_matching_location(&self.location, &other.location);
733
734        Stream::new(
735            self.location.clone(),
736            HydroNode::CrossProduct {
737                left: Box::new(self.ir_node.into_inner()),
738                right: Box::new(other.ir_node.into_inner()),
739                metadata: self.location.new_node_metadata::<(T, T2)>(),
740            },
741        )
742    }
743
744    /// Takes one stream as input and filters out any duplicate occurrences. The output
745    /// contains all unique values from the input.
746    ///
747    /// # Example
748    /// ```rust
749    /// # use hydro_lang::prelude::*;
750    /// # use futures::StreamExt;
751    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
752    /// let tick = process.tick();
753    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
754    /// # }, |mut stream| async move {
755    /// # for w in vec![1, 2, 3, 4] {
756    /// #     assert_eq!(stream.next().await.unwrap(), w);
757    /// # }
758    /// # }));
759    /// ```
760    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
761    where
762        T: Eq + Hash,
763    {
764        Stream::new(
765            self.location.clone(),
766            HydroNode::Unique {
767                input: Box::new(self.ir_node.into_inner()),
768                metadata: self.location.new_node_metadata::<T>(),
769            },
770        )
771    }
772
773    /// Outputs everything in this stream that is *not* contained in the `other` stream.
774    ///
775    /// The `other` stream must be [`Bounded`], since this function will wait until
776    /// all its elements are available before producing any output.
777    /// # Example
778    /// ```rust
779    /// # use hydro_lang::prelude::*;
780    /// # use futures::StreamExt;
781    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
782    /// let tick = process.tick();
783    /// let stream = process
784    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
785    ///   .batch(&tick, nondet!(/** test */));
786    /// let batch = process
787    ///   .source_iter(q!(vec![1, 2]))
788    ///   .batch(&tick, nondet!(/** test */));
789    /// stream.filter_not_in(batch).all_ticks()
790    /// # }, |mut stream| async move {
791    /// # for w in vec![3, 4] {
792    /// #     assert_eq!(stream.next().await.unwrap(), w);
793    /// # }
794    /// # }));
795    /// ```
796    pub fn filter_not_in<O2: Ordering>(
797        self,
798        other: Stream<T, L, Bounded, O2, R>,
799    ) -> Stream<T, L, Bounded, O, R>
800    where
801        T: Eq + Hash,
802    {
803        check_matching_location(&self.location, &other.location);
804
805        Stream::new(
806            self.location.clone(),
807            HydroNode::Difference {
808                pos: Box::new(self.ir_node.into_inner()),
809                neg: Box::new(other.ir_node.into_inner()),
810                metadata: self.location.new_node_metadata::<T>(),
811            },
812        )
813    }
814
815    /// An operator which allows you to "inspect" each element of a stream without
816    /// modifying it. The closure `f` is called on a reference to each item. This is
817    /// mainly useful for debugging, and should not be used to generate side-effects.
818    ///
819    /// # Example
820    /// ```rust
821    /// # use hydro_lang::prelude::*;
822    /// # use futures::StreamExt;
823    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
824    /// let nums = process.source_iter(q!(vec![1, 2]));
825    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
826    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
827    /// # }, |mut stream| async move {
828    /// # for w in vec![1, 2] {
829    /// #     assert_eq!(stream.next().await.unwrap(), w);
830    /// # }
831    /// # }));
832    /// ```
833    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<T, L, B, O, R>
834    where
835        F: Fn(&T) + 'a,
836    {
837        let f = f.splice_fn1_borrow_ctx(&self.location).into();
838
839        if L::is_top_level() {
840            Stream::new(
841                self.location.clone(),
842                HydroNode::Persist {
843                    inner: Box::new(HydroNode::Inspect {
844                        f,
845                        input: Box::new(HydroNode::Unpersist {
846                            inner: Box::new(self.ir_node.into_inner()),
847                            metadata: self.location.new_node_metadata::<T>(),
848                        }),
849                        metadata: self.location.new_node_metadata::<T>(),
850                    }),
851                    metadata: self.location.new_node_metadata::<T>(),
852                },
853            )
854        } else {
855            Stream::new(
856                self.location.clone(),
857                HydroNode::Inspect {
858                    f,
859                    input: Box::new(self.ir_node.into_inner()),
860                    metadata: self.location.new_node_metadata::<T>(),
861                },
862            )
863        }
864    }
865
866    /// An operator which allows you to "name" a `HydroNode`.
867    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
868    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
869        {
870            let mut node = self.ir_node.borrow_mut();
871            let metadata = node.metadata_mut();
872            metadata.tag = Some(name.to_string());
873        }
874        self
875    }
876
877    /// Explicitly "casts" the stream to a type with a different ordering
878    /// guarantee. Useful in unsafe code where the ordering cannot be proven
879    /// by the type-system.
880    ///
881    /// # Non-Determinism
882    /// This function is used as an escape hatch, and any mistakes in the
883    /// provided ordering guarantee will propagate into the guarantees
884    /// for the rest of the program.
885    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
886        Stream::new(self.location, self.ir_node.into_inner())
887    }
888
889    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
890    /// which is always safe because that is the weakest possible guarantee.
891    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
892        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
893        self.assume_ordering::<NoOrder>(nondet)
894    }
895
896    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
897    /// enforcing that `O2` is weaker than the input ordering guarantee.
898    pub fn weaken_ordering<O2: Ordering + MinOrder<O, Min = O2>>(self) -> Stream<T, L, B, O2, R> {
899        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
900        self.assume_ordering::<O2>(nondet)
901    }
902
903    /// Explicitly "casts" the stream to a type with a different retries
904    /// guarantee. Useful in unsafe code where the lack of retries cannot
905    /// be proven by the type-system.
906    ///
907    /// # Non-Determinism
908    /// This function is used as an escape hatch, and any mistakes in the
909    /// provided retries guarantee will propagate into the guarantees
910    /// for the rest of the program.
911    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
912        Stream::new(self.location, self.ir_node.into_inner())
913    }
914
915    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
916    /// which is always safe because that is the weakest possible guarantee.
917    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
918        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
919        self.assume_retries::<AtLeastOnce>(nondet)
920    }
921
922    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
923    /// enforcing that `R2` is weaker than the input retries guarantee.
924    pub fn weaken_retries<R2: Retries + MinRetries<R, Min = R2>>(self) -> Stream<T, L, B, O, R2> {
925        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
926        self.assume_retries::<R2>(nondet)
927    }
928}
929
930impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
931where
932    L: Location<'a>,
933{
934    /// Given a stream with [`ExactlyOnce`] retry guarantees, weakens it to an arbitrary guarantee
935    /// `R2`, which is safe because all guarantees are equal to or weaker than [`ExactlyOnce`]
936    pub fn weaker_retries<R2: Retries>(self) -> Stream<T, L, B, O, R2> {
937        self.assume_retries(
938            nondet!(/** any retry ordering is the same or weaker than ExactlyOnce */),
939        )
940    }
941}
942
943impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
944where
945    L: Location<'a>,
946{
947    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
948    ///
949    /// # Example
950    /// ```rust
951    /// # use hydro_lang::prelude::*;
952    /// # use futures::StreamExt;
953    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
954    /// process.source_iter(q!(&[1, 2, 3])).cloned()
955    /// # }, |mut stream| async move {
956    /// // 1, 2, 3
957    /// # for w in vec![1, 2, 3] {
958    /// #     assert_eq!(stream.next().await.unwrap(), w);
959    /// # }
960    /// # }));
961    /// ```
962    pub fn cloned(self) -> Stream<T, L, B, O, R>
963    where
964        T: Clone,
965    {
966        self.map(q!(|d| d.clone()))
967    }
968}
969
970impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
971where
972    L: Location<'a>,
973{
974    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
975    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
976    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
977    ///
978    /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
979    /// and there may be duplicates.
980    ///
981    /// # Example
982    /// ```rust
983    /// # use hydro_lang::prelude::*;
984    /// # use futures::StreamExt;
985    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
986    /// let tick = process.tick();
987    /// let bools = process.source_iter(q!(vec![false, true, false]));
988    /// let batch = bools.batch(&tick, nondet!(/** test */));
989    /// batch
990    ///     .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
991    ///     .all_ticks()
992    /// # }, |mut stream| async move {
993    /// // true
994    /// # assert_eq!(stream.next().await.unwrap(), true);
995    /// # }));
996    /// ```
997    pub fn fold_commutative_idempotent<A, I, F>(
998        self,
999        init: impl IntoQuotedMut<'a, I, L>,
1000        comb: impl IntoQuotedMut<'a, F, L>,
1001    ) -> Singleton<A, L, B>
1002    where
1003        I: Fn() -> A + 'a,
1004        F: Fn(&mut A, T),
1005    {
1006        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1007        self.assume_ordering(nondet)
1008            .assume_retries(nondet)
1009            .fold(init, comb)
1010    }
1011
1012    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1013    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1014    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1015    /// reference, so that it can be modified in place.
1016    ///
1017    /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
1018    /// and there may be duplicates.
1019    ///
1020    /// # Example
1021    /// ```rust
1022    /// # use hydro_lang::prelude::*;
1023    /// # use futures::StreamExt;
1024    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1025    /// let tick = process.tick();
1026    /// let bools = process.source_iter(q!(vec![false, true, false]));
1027    /// let batch = bools.batch(&tick, nondet!(/** test */));
1028    /// batch
1029    ///     .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1030    ///     .all_ticks()
1031    /// # }, |mut stream| async move {
1032    /// // true
1033    /// # assert_eq!(stream.next().await.unwrap(), true);
1034    /// # }));
1035    /// ```
1036    pub fn reduce_commutative_idempotent<F>(
1037        self,
1038        comb: impl IntoQuotedMut<'a, F, L>,
1039    ) -> Optional<T, L, B>
1040    where
1041        F: Fn(&mut T, T) + 'a,
1042    {
1043        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1044        self.assume_ordering(nondet)
1045            .assume_retries(nondet)
1046            .reduce(comb)
1047    }
1048
1049    /// Computes the maximum element in the stream as an [`Optional`], which
1050    /// will be empty until the first element in the input arrives.
1051    ///
1052    /// # Example
1053    /// ```rust
1054    /// # use hydro_lang::prelude::*;
1055    /// # use futures::StreamExt;
1056    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1057    /// let tick = process.tick();
1058    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1059    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1060    /// batch.max().all_ticks()
1061    /// # }, |mut stream| async move {
1062    /// // 4
1063    /// # assert_eq!(stream.next().await.unwrap(), 4);
1064    /// # }));
1065    /// ```
1066    pub fn max(self) -> Optional<T, L, B>
1067    where
1068        T: Ord,
1069    {
1070        self.reduce_commutative_idempotent(q!(|curr, new| {
1071            if new > *curr {
1072                *curr = new;
1073            }
1074        }))
1075    }
1076
1077    /// Computes the maximum element in the stream as an [`Optional`], where the
1078    /// maximum is determined according to the `key` function. The [`Optional`] will
1079    /// be empty until the first element in the input arrives.
1080    ///
1081    /// # Example
1082    /// ```rust
1083    /// # use hydro_lang::prelude::*;
1084    /// # use futures::StreamExt;
1085    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1086    /// let tick = process.tick();
1087    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1088    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1089    /// batch.max_by_key(q!(|x| -x)).all_ticks()
1090    /// # }, |mut stream| async move {
1091    /// // 1
1092    /// # assert_eq!(stream.next().await.unwrap(), 1);
1093    /// # }));
1094    /// ```
1095    pub fn max_by_key<K, F>(self, key: impl IntoQuotedMut<'a, F, L> + Copy) -> Optional<T, L, B>
1096    where
1097        K: Ord,
1098        F: Fn(&T) -> K + 'a,
1099    {
1100        let f = key.splice_fn1_borrow_ctx(&self.location);
1101
1102        let wrapped: syn::Expr = parse_quote!({
1103            let key_fn = #f;
1104            move |curr, new| {
1105                if key_fn(&new) > key_fn(&*curr) {
1106                    *curr = new;
1107                }
1108            }
1109        });
1110
1111        let mut core = HydroNode::Reduce {
1112            f: wrapped.into(),
1113            input: Box::new(self.ir_node.into_inner()),
1114            metadata: self.location.new_node_metadata::<T>(),
1115        };
1116
1117        if L::is_top_level() {
1118            core = HydroNode::Persist {
1119                inner: Box::new(core),
1120                metadata: self.location.new_node_metadata::<T>(),
1121            };
1122        }
1123
1124        Optional::new(self.location, core)
1125    }
1126
1127    /// Computes the minimum element in the stream as an [`Optional`], which
1128    /// will be empty until the first element in the input arrives.
1129    ///
1130    /// # Example
1131    /// ```rust
1132    /// # use hydro_lang::prelude::*;
1133    /// # use futures::StreamExt;
1134    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1135    /// let tick = process.tick();
1136    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1137    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1138    /// batch.min().all_ticks()
1139    /// # }, |mut stream| async move {
1140    /// // 1
1141    /// # assert_eq!(stream.next().await.unwrap(), 1);
1142    /// # }));
1143    /// ```
1144    pub fn min(self) -> Optional<T, L, B>
1145    where
1146        T: Ord,
1147    {
1148        self.reduce_commutative_idempotent(q!(|curr, new| {
1149            if new < *curr {
1150                *curr = new;
1151            }
1152        }))
1153    }
1154}
1155
1156impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1157where
1158    L: Location<'a>,
1159{
1160    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1161    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1162    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1163    ///
1164    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1165    ///
1166    /// # Example
1167    /// ```rust
1168    /// # use hydro_lang::prelude::*;
1169    /// # use futures::StreamExt;
1170    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1171    /// let tick = process.tick();
1172    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1173    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1174    /// batch
1175    ///     .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1176    ///     .all_ticks()
1177    /// # }, |mut stream| async move {
1178    /// // 10
1179    /// # assert_eq!(stream.next().await.unwrap(), 10);
1180    /// # }));
1181    /// ```
1182    pub fn fold_commutative<A, I, F>(
1183        self,
1184        init: impl IntoQuotedMut<'a, I, L>,
1185        comb: impl IntoQuotedMut<'a, F, L>,
1186    ) -> Singleton<A, L, B>
1187    where
1188        I: Fn() -> A + 'a,
1189        F: Fn(&mut A, T),
1190    {
1191        let nondet = nondet!(/** the combinator function is commutative */);
1192        self.assume_ordering(nondet).fold(init, comb)
1193    }
1194
1195    /// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream,
1196    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1197    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1198    /// reference, so that it can be modified in place.
1199    ///
1200    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1201    ///
1202    /// # Example
1203    /// ```rust
1204    /// # use hydro_lang::prelude::*;
1205    /// # use futures::StreamExt;
1206    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1207    /// let tick = process.tick();
1208    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1209    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1210    /// batch
1211    ///     .reduce_commutative(q!(|curr, new| *curr += new))
1212    ///     .all_ticks()
1213    /// # }, |mut stream| async move {
1214    /// // 10
1215    /// # assert_eq!(stream.next().await.unwrap(), 10);
1216    /// # }));
1217    /// ```
1218    pub fn reduce_commutative<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1219    where
1220        F: Fn(&mut T, T) + 'a,
1221    {
1222        let nondet = nondet!(/** the combinator function is commutative */);
1223        self.assume_ordering(nondet).reduce(comb)
1224    }
1225
1226    /// Computes the number of elements in the stream as a [`Singleton`].
1227    ///
1228    /// # Example
1229    /// ```rust
1230    /// # use hydro_lang::prelude::*;
1231    /// # use futures::StreamExt;
1232    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1233    /// let tick = process.tick();
1234    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1235    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1236    /// batch.count().all_ticks()
1237    /// # }, |mut stream| async move {
1238    /// // 4
1239    /// # assert_eq!(stream.next().await.unwrap(), 4);
1240    /// # }));
1241    /// ```
1242    pub fn count(self) -> Singleton<usize, L, B> {
1243        self.fold_commutative(q!(|| 0usize), q!(|count, _| *count += 1))
1244    }
1245}
1246
1247impl<'a, T, L, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R>
1248where
1249    L: Location<'a>,
1250{
1251    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1252    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1253    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1254    ///
1255    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1256    ///
1257    /// # Example
1258    /// ```rust
1259    /// # use hydro_lang::prelude::*;
1260    /// # use futures::StreamExt;
1261    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1262    /// let tick = process.tick();
1263    /// let bools = process.source_iter(q!(vec![false, true, false]));
1264    /// let batch = bools.batch(&tick, nondet!(/** test */));
1265    /// batch
1266    ///     .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1267    ///     .all_ticks()
1268    /// # }, |mut stream| async move {
1269    /// // true
1270    /// # assert_eq!(stream.next().await.unwrap(), true);
1271    /// # }));
1272    /// ```
1273    pub fn fold_idempotent<A, I, F>(
1274        self,
1275        init: impl IntoQuotedMut<'a, I, L>,
1276        comb: impl IntoQuotedMut<'a, F, L>,
1277    ) -> Singleton<A, L, B>
1278    where
1279        I: Fn() -> A + 'a,
1280        F: Fn(&mut A, T),
1281    {
1282        let nondet = nondet!(/** the combinator function is idempotent */);
1283        self.assume_retries(nondet).fold(init, comb)
1284    }
1285
1286    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1287    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1288    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1289    /// reference, so that it can be modified in place.
1290    ///
1291    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1292    ///
1293    /// # Example
1294    /// ```rust
1295    /// # use hydro_lang::prelude::*;
1296    /// # use futures::StreamExt;
1297    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1298    /// let tick = process.tick();
1299    /// let bools = process.source_iter(q!(vec![false, true, false]));
1300    /// let batch = bools.batch(&tick, nondet!(/** test */));
1301    /// batch.reduce_idempotent(q!(|acc, x| *acc |= x)).all_ticks()
1302    /// # }, |mut stream| async move {
1303    /// // true
1304    /// # assert_eq!(stream.next().await.unwrap(), true);
1305    /// # }));
1306    /// ```
1307    pub fn reduce_idempotent<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1308    where
1309        F: Fn(&mut T, T) + 'a,
1310    {
1311        let nondet = nondet!(/** the combinator function is idempotent */);
1312        self.assume_retries(nondet).reduce(comb)
1313    }
1314
1315    /// Computes the first element in the stream as an [`Optional`], which
1316    /// will be empty until the first element in the input arrives.
1317    ///
1318    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1319    /// re-ordering of elements may cause the first element to change.
1320    ///
1321    /// # Example
1322    /// ```rust
1323    /// # use hydro_lang::prelude::*;
1324    /// # use futures::StreamExt;
1325    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1326    /// let tick = process.tick();
1327    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1328    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1329    /// batch.first().all_ticks()
1330    /// # }, |mut stream| async move {
1331    /// // 1
1332    /// # assert_eq!(stream.next().await.unwrap(), 1);
1333    /// # }));
1334    /// ```
1335    pub fn first(self) -> Optional<T, L, B> {
1336        self.reduce_idempotent(q!(|_, _| {}))
1337    }
1338
1339    /// Computes the last element in the stream as an [`Optional`], which
1340    /// will be empty until an element in the input arrives.
1341    ///
1342    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1343    /// re-ordering of elements may cause the last element to change.
1344    ///
1345    /// # Example
1346    /// ```rust
1347    /// # use hydro_lang::prelude::*;
1348    /// # use futures::StreamExt;
1349    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1350    /// let tick = process.tick();
1351    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1352    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1353    /// batch.last().all_ticks()
1354    /// # }, |mut stream| async move {
1355    /// // 4
1356    /// # assert_eq!(stream.next().await.unwrap(), 4);
1357    /// # }));
1358    /// ```
1359    pub fn last(self) -> Optional<T, L, B> {
1360        self.reduce_idempotent(q!(|curr, new| *curr = new))
1361    }
1362}
1363
1364impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
1365where
1366    L: Location<'a>,
1367{
1368    /// Returns a stream with the current count tupled with each element in the input stream.
1369    ///
1370    /// # Example
1371    /// ```rust
1372    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1373    /// # use futures::StreamExt;
1374    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, TotalOrder, ExactlyOnce>(|process| {
1375    /// let tick = process.tick();
1376    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1377    /// numbers.enumerate()
1378    /// # }, |mut stream| async move {
1379    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1380    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1381    /// #     assert_eq!(stream.next().await.unwrap(), w);
1382    /// # }
1383    /// # }));
1384    /// ```
1385    pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder, ExactlyOnce> {
1386        if L::is_top_level() {
1387            Stream::new(
1388                self.location.clone(),
1389                HydroNode::Persist {
1390                    inner: Box::new(HydroNode::Enumerate {
1391                        is_static: true,
1392                        input: Box::new(HydroNode::Unpersist {
1393                            inner: Box::new(self.ir_node.into_inner()),
1394                            metadata: self.location.new_node_metadata::<T>(),
1395                        }),
1396                        metadata: self.location.new_node_metadata::<(usize, T)>(),
1397                    }),
1398                    metadata: self.location.new_node_metadata::<(usize, T)>(),
1399                },
1400            )
1401        } else {
1402            Stream::new(
1403                self.location.clone(),
1404                HydroNode::Enumerate {
1405                    is_static: false,
1406                    input: Box::new(self.ir_node.into_inner()),
1407                    metadata: self.location.new_node_metadata::<(usize, T)>(),
1408                },
1409            )
1410        }
1411    }
1412
1413    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1414    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1415    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1416    ///
1417    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1418    /// to depend on the order of elements in the stream.
1419    ///
1420    /// # Example
1421    /// ```rust
1422    /// # use hydro_lang::prelude::*;
1423    /// # use futures::StreamExt;
1424    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1425    /// let tick = process.tick();
1426    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1427    /// let batch = words.batch(&tick, nondet!(/** test */));
1428    /// batch
1429    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1430    ///     .all_ticks()
1431    /// # }, |mut stream| async move {
1432    /// // "HELLOWORLD"
1433    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1434    /// # }));
1435    /// ```
1436    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
1437        self,
1438        init: impl IntoQuotedMut<'a, I, L>,
1439        comb: impl IntoQuotedMut<'a, F, L>,
1440    ) -> Singleton<A, L, B> {
1441        let init = init.splice_fn0_ctx(&self.location).into();
1442        let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1443
1444        let mut core = HydroNode::Fold {
1445            init,
1446            acc: comb,
1447            input: Box::new(self.ir_node.into_inner()),
1448            metadata: self.location.new_node_metadata::<A>(),
1449        };
1450
1451        if L::is_top_level() {
1452            // top-level (possibly unbounded) singletons are represented as
1453            // a stream which produces all values from all ticks every tick,
1454            // so Unpersist will always give the lastest aggregation
1455            core = HydroNode::Persist {
1456                inner: Box::new(core),
1457                metadata: self.location.new_node_metadata::<A>(),
1458            };
1459        }
1460
1461        Singleton::new(self.location, core)
1462    }
1463
1464    /// Collects all the elements of this stream into a single [`Vec`] element.
1465    ///
1466    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1467    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1468    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1469    /// the vector at an arbitrary point in time.
1470    ///
1471    /// # Example
1472    /// ```rust
1473    /// # use hydro_lang::prelude::*;
1474    /// # use futures::StreamExt;
1475    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1476    /// let tick = process.tick();
1477    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1478    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1479    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1480    /// # }, |mut stream| async move {
1481    /// // [ vec![1, 2, 3, 4] ]
1482    /// # for w in vec![vec![1, 2, 3, 4]] {
1483    /// #     assert_eq!(stream.next().await.unwrap(), w);
1484    /// # }
1485    /// # }));
1486    /// ```
1487    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B> {
1488        self.fold(
1489            q!(|| vec![]),
1490            q!(|acc, v| {
1491                acc.push(v);
1492            }),
1493        )
1494    }
1495
1496    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1497    /// and emitting each intermediate result.
1498    ///
1499    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1500    /// containing all intermediate accumulated values. The scan operation can also terminate early
1501    /// by returning `None`.
1502    ///
1503    /// The function takes a mutable reference to the accumulator and the current element, and returns
1504    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1505    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1506    ///
1507    /// # Examples
1508    ///
1509    /// Basic usage - running sum:
1510    /// ```rust
1511    /// # use hydro_lang::prelude::*;
1512    /// # use futures::StreamExt;
1513    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1514    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1515    ///     q!(|| 0),
1516    ///     q!(|acc, x| {
1517    ///         *acc += x;
1518    ///         Some(*acc)
1519    ///     }),
1520    /// )
1521    /// # }, |mut stream| async move {
1522    /// // Output: 1, 3, 6, 10
1523    /// # for w in vec![1, 3, 6, 10] {
1524    /// #     assert_eq!(stream.next().await.unwrap(), w);
1525    /// # }
1526    /// # }));
1527    /// ```
1528    ///
1529    /// Early termination example:
1530    /// ```rust
1531    /// # use hydro_lang::prelude::*;
1532    /// # use futures::StreamExt;
1533    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1534    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1535    ///     q!(|| 1),
1536    ///     q!(|state, x| {
1537    ///         *state = *state * x;
1538    ///         if *state > 6 {
1539    ///             None // Terminate the stream
1540    ///         } else {
1541    ///             Some(-*state)
1542    ///         }
1543    ///     }),
1544    /// )
1545    /// # }, |mut stream| async move {
1546    /// // Output: -1, -2, -6
1547    /// # for w in vec![-1, -2, -6] {
1548    /// #     assert_eq!(stream.next().await.unwrap(), w);
1549    /// # }
1550    /// # }));
1551    /// ```
1552    pub fn scan<A, U, I, F>(
1553        self,
1554        init: impl IntoQuotedMut<'a, I, L>,
1555        f: impl IntoQuotedMut<'a, F, L>,
1556    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1557    where
1558        I: Fn() -> A + 'a,
1559        F: Fn(&mut A, T) -> Option<U> + 'a,
1560    {
1561        let init = init.splice_fn0_ctx(&self.location).into();
1562        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1563
1564        if L::is_top_level() {
1565            Stream::new(
1566                self.location.clone(),
1567                HydroNode::Persist {
1568                    inner: Box::new(HydroNode::Scan {
1569                        init,
1570                        acc: f,
1571                        input: Box::new(self.ir_node.into_inner()),
1572                        metadata: self.location.new_node_metadata::<U>(),
1573                    }),
1574                    metadata: self.location.new_node_metadata::<U>(),
1575                },
1576            )
1577        } else {
1578            Stream::new(
1579                self.location.clone(),
1580                HydroNode::Scan {
1581                    init,
1582                    acc: f,
1583                    input: Box::new(self.ir_node.into_inner()),
1584                    metadata: self.location.new_node_metadata::<U>(),
1585                },
1586            )
1587        }
1588    }
1589
1590    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1591    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1592    /// until the first element in the input arrives.
1593    ///
1594    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1595    /// to depend on the order of elements in the stream.
1596    ///
1597    /// # Example
1598    /// ```rust
1599    /// # use hydro_lang::prelude::*;
1600    /// # use futures::StreamExt;
1601    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1602    /// let tick = process.tick();
1603    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1604    /// let batch = words.batch(&tick, nondet!(/** test */));
1605    /// batch
1606    ///     .map(q!(|x| x.to_string()))
1607    ///     .reduce(q!(|curr, new| curr.push_str(&new)))
1608    ///     .all_ticks()
1609    /// # }, |mut stream| async move {
1610    /// // "HELLOWORLD"
1611    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1612    /// # }));
1613    /// ```
1614    pub fn reduce<F: Fn(&mut T, T) + 'a>(
1615        self,
1616        comb: impl IntoQuotedMut<'a, F, L>,
1617    ) -> Optional<T, L, B> {
1618        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1619        let mut core = HydroNode::Reduce {
1620            f,
1621            input: Box::new(self.ir_node.into_inner()),
1622            metadata: self.location.new_node_metadata::<T>(),
1623        };
1624
1625        if L::is_top_level() {
1626            core = HydroNode::Persist {
1627                inner: Box::new(core),
1628                metadata: self.location.new_node_metadata::<T>(),
1629            };
1630        }
1631
1632        Optional::new(self.location, core)
1633    }
1634}
1635
1636impl<'a, T, L: Location<'a> + NoTick + NoAtomic, O: Ordering, R: Retries>
1637    Stream<T, L, Unbounded, O, R>
1638{
1639    /// Produces a new stream that interleaves the elements of the two input streams.
1640    /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1641    ///
1642    /// Currently, both input streams must be [`Unbounded`]. When the streams are
1643    /// [`Bounded`], you can use [`Stream::chain`] instead.
1644    ///
1645    /// # Example
1646    /// ```rust
1647    /// # use hydro_lang::prelude::*;
1648    /// # use futures::StreamExt;
1649    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1650    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1651    /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1652    /// # }, |mut stream| async move {
1653    /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1654    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1655    /// #     assert_eq!(stream.next().await.unwrap(), w);
1656    /// # }
1657    /// # }));
1658    /// ```
1659    pub fn interleave<O2: Ordering, R2: Retries>(
1660        self,
1661        other: Stream<T, L, Unbounded, O2, R2>,
1662    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1663    where
1664        R: MinRetries<R2>,
1665    {
1666        let tick = self.location.tick();
1667        // Because the outputs are unordered, we can interleave batches from both streams.
1668        let nondet_batch_interleaving = nondet!(/** output stream is NoOrder, can interleave */);
1669        self.batch(&tick, nondet_batch_interleaving)
1670            .weakest_ordering()
1671            .chain(
1672                other
1673                    .batch(&tick, nondet_batch_interleaving)
1674                    .weakest_ordering(),
1675            )
1676            .all_ticks()
1677    }
1678}
1679
1680impl<'a, T, L, O: Ordering, R: Retries> Stream<T, L, Bounded, O, R>
1681where
1682    L: Location<'a>,
1683{
1684    /// Produces a new stream that emits the input elements in sorted order.
1685    ///
1686    /// The input stream can have any ordering guarantee, but the output stream
1687    /// will have a [`TotalOrder`] guarantee. This operator will block until all
1688    /// elements in the input stream are available, so it requires the input stream
1689    /// to be [`Bounded`].
1690    ///
1691    /// # Example
1692    /// ```rust
1693    /// # use hydro_lang::prelude::*;
1694    /// # use futures::StreamExt;
1695    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1696    /// let tick = process.tick();
1697    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1698    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1699    /// batch.sort().all_ticks()
1700    /// # }, |mut stream| async move {
1701    /// // 1, 2, 3, 4
1702    /// # for w in (1..5) {
1703    /// #     assert_eq!(stream.next().await.unwrap(), w);
1704    /// # }
1705    /// # }));
1706    /// ```
1707    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1708    where
1709        T: Ord,
1710    {
1711        Stream::new(
1712            self.location.clone(),
1713            HydroNode::Sort {
1714                input: Box::new(self.ir_node.into_inner()),
1715                metadata: self.location.new_node_metadata::<T>(),
1716            },
1717        )
1718    }
1719
1720    /// Produces a new stream that first emits the elements of the `self` stream,
1721    /// and then emits the elements of the `other` stream. The output stream has
1722    /// a [`TotalOrder`] guarantee if and only if both input streams have a
1723    /// [`TotalOrder`] guarantee.
1724    ///
1725    /// Currently, both input streams must be [`Bounded`]. This operator will block
1726    /// on the first stream until all its elements are available. In a future version,
1727    /// we will relax the requirement on the `other` stream.
1728    ///
1729    /// # Example
1730    /// ```rust
1731    /// # use hydro_lang::prelude::*;
1732    /// # use futures::StreamExt;
1733    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1734    /// let tick = process.tick();
1735    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1736    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1737    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1738    /// # }, |mut stream| async move {
1739    /// // 2, 3, 4, 5, 1, 2, 3, 4
1740    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1741    /// #     assert_eq!(stream.next().await.unwrap(), w);
1742    /// # }
1743    /// # }));
1744    /// ```
1745    pub fn chain<O2: Ordering, R2: Retries>(
1746        self,
1747        other: Stream<T, L, Bounded, O2, R2>,
1748    ) -> Stream<T, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1749    where
1750        O: MinOrder<O2>,
1751        R: MinRetries<R2>,
1752    {
1753        check_matching_location(&self.location, &other.location);
1754
1755        Stream::new(
1756            self.location.clone(),
1757            HydroNode::Chain {
1758                first: Box::new(self.ir_node.into_inner()),
1759                second: Box::new(other.ir_node.into_inner()),
1760                metadata: self.location.new_node_metadata::<T>(),
1761            },
1762        )
1763    }
1764
1765    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
1766    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
1767    /// because this is compiled into a nested loop.
1768    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
1769        self,
1770        other: Stream<T2, L, Bounded, O2, R>,
1771    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
1772    where
1773        T: Clone,
1774        T2: Clone,
1775    {
1776        check_matching_location(&self.location, &other.location);
1777
1778        Stream::new(
1779            self.location.clone(),
1780            HydroNode::CrossProduct {
1781                left: Box::new(self.ir_node.into_inner()),
1782                right: Box::new(other.ir_node.into_inner()),
1783                metadata: self.location.new_node_metadata::<(T, T2)>(),
1784            },
1785        )
1786    }
1787
1788    /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
1789    /// `self` used as the values for *each* key.
1790    ///
1791    /// This is helpful when "broadcasting" a set of values so that all the keys have the same
1792    /// values. For example, it can be used to send the same set of elements to several cluster
1793    /// members, if the membership information is available as a [`KeyedSingleton`].
1794    ///
1795    /// # Example
1796    /// ```rust
1797    /// # use hydro_lang::prelude::*;
1798    /// # use futures::StreamExt;
1799    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1800    /// # let tick = process.tick();
1801    /// let keyed_singleton = // { 1: (), 2: () }
1802    /// # process
1803    /// #     .source_iter(q!(vec![(1, ()), (2, ())]))
1804    /// #     .into_keyed()
1805    /// #     .batch(&tick, nondet!(/** test */))
1806    /// #     .first();
1807    /// let stream = // [ "a", "b" ]
1808    /// # process
1809    /// #     .source_iter(q!(vec!["a".to_string(), "b".to_string()]))
1810    /// #     .batch(&tick, nondet!(/** test */));
1811    /// stream.repeat_with_keys(keyed_singleton)
1812    /// # .entries().all_ticks()
1813    /// # }, |mut stream| async move {
1814    /// // { 1: ["a", "b" ], 2: ["a", "b"] }
1815    /// # let mut results = Vec::new();
1816    /// # for _ in 0..4 {
1817    /// #     results.push(stream.next().await.unwrap());
1818    /// # }
1819    /// # results.sort();
1820    /// # assert_eq!(results, vec![(1, "a".to_string()), (1, "b".to_string()), (2, "a".to_string()), (2, "b".to_string())]);
1821    /// # }));
1822    /// ```
1823    pub fn repeat_with_keys<K, V2>(
1824        self,
1825        keys: KeyedSingleton<K, V2, L, Bounded>,
1826    ) -> KeyedStream<K, T, L, Bounded, O, R>
1827    where
1828        K: Clone,
1829        T: Clone,
1830    {
1831        keys.keys().weaken_retries().cross_product_nested_loop(self).into_keyed().assume_ordering(
1832            nondet!(/** keyed stream does not depend on ordering of keys, cross_product_nested_loop preserves order of values */)
1833        )
1834    }
1835}
1836
1837impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
1838where
1839    L: Location<'a>,
1840{
1841    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
1842    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
1843    /// by equi-joining the two streams on the key attribute `K`.
1844    ///
1845    /// # Example
1846    /// ```rust
1847    /// # use hydro_lang::prelude::*;
1848    /// # use std::collections::HashSet;
1849    /// # use futures::StreamExt;
1850    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1851    /// let tick = process.tick();
1852    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
1853    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
1854    /// stream1.join(stream2)
1855    /// # }, |mut stream| async move {
1856    /// // (1, ('a', 'x')), (2, ('b', 'y'))
1857    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
1858    /// # stream.map(|i| assert!(expected.contains(&i)));
1859    /// # }));
1860    pub fn join<V2, O2: Ordering, R2: Retries>(
1861        self,
1862        n: Stream<(K, V2), L, B, O2, R2>,
1863    ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
1864    where
1865        K: Eq + Hash,
1866        R: MinRetries<R2>,
1867    {
1868        check_matching_location(&self.location, &n.location);
1869
1870        Stream::new(
1871            self.location.clone(),
1872            HydroNode::Join {
1873                left: Box::new(self.ir_node.into_inner()),
1874                right: Box::new(n.ir_node.into_inner()),
1875                metadata: self.location.new_node_metadata::<(K, (V1, V2))>(),
1876            },
1877        )
1878    }
1879
1880    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
1881    /// computes the anti-join of the items in the input -- i.e. returns
1882    /// unique items in the first input that do not have a matching key
1883    /// in the second input.
1884    ///
1885    /// # Example
1886    /// ```rust
1887    /// # use hydro_lang::prelude::*;
1888    /// # use futures::StreamExt;
1889    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1890    /// let tick = process.tick();
1891    /// let stream = process
1892    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1893    ///   .batch(&tick, nondet!(/** test */));
1894    /// let batch = process
1895    ///   .source_iter(q!(vec![1, 2]))
1896    ///   .batch(&tick, nondet!(/** test */));
1897    /// stream.anti_join(batch).all_ticks()
1898    /// # }, |mut stream| async move {
1899    /// # for w in vec![(3, 'c'), (4, 'd')] {
1900    /// #     assert_eq!(stream.next().await.unwrap(), w);
1901    /// # }
1902    /// # }));
1903    pub fn anti_join<O2: Ordering, R2: Retries>(
1904        self,
1905        n: Stream<K, L, Bounded, O2, R2>,
1906    ) -> Stream<(K, V1), L, B, O, R>
1907    where
1908        K: Eq + Hash,
1909    {
1910        check_matching_location(&self.location, &n.location);
1911
1912        Stream::new(
1913            self.location.clone(),
1914            HydroNode::AntiJoin {
1915                pos: Box::new(self.ir_node.into_inner()),
1916                neg: Box::new(n.ir_node.into_inner()),
1917                metadata: self.location.new_node_metadata::<(K, V1)>(),
1918            },
1919        )
1920    }
1921}
1922
1923impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
1924    Stream<(K, V), L, B, O, R>
1925{
1926    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
1927    /// is used as the key and the second element is added to the entries associated with that key.
1928    ///
1929    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
1930    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
1931    /// performing grouped aggregations, but also for more precise ordering guarantees such as
1932    /// total ordering _within_ each group but no ordering _across_ groups.
1933    ///
1934    /// # Example
1935    /// ```rust
1936    /// # use hydro_lang::prelude::*;
1937    /// # use futures::StreamExt;
1938    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1939    /// process
1940    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1941    ///     .into_keyed()
1942    /// #   .entries()
1943    /// # }, |mut stream| async move {
1944    /// // { 1: [2, 3], 2: [4] }
1945    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
1946    /// #     assert_eq!(stream.next().await.unwrap(), w);
1947    /// # }
1948    /// # }));
1949    /// ```
1950    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
1951        KeyedStream {
1952            underlying: self.weakest_ordering(),
1953            _phantom_order: Default::default(),
1954        }
1955    }
1956}
1957
1958impl<'a, K, V, L> Stream<(K, V), Tick<L>, Bounded, TotalOrder, ExactlyOnce>
1959where
1960    K: Eq + Hash,
1961    L: Location<'a>,
1962{
1963    #[deprecated = "use .into_keyed().fold(...) instead"]
1964    /// A special case of [`Stream::fold`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1965    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1966    /// in the second element are accumulated via the `comb` closure.
1967    ///
1968    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1969    /// to depend on the order of elements in the stream.
1970    ///
1971    /// If the input and output value types are the same and do not require initialization then use
1972    /// [`Stream::reduce_keyed`].
1973    ///
1974    /// # Example
1975    /// ```rust
1976    /// # use hydro_lang::prelude::*;
1977    /// # use futures::StreamExt;
1978    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1979    /// let tick = process.tick();
1980    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1981    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1982    /// batch
1983    ///     .fold_keyed(q!(|| 0), q!(|acc, x| *acc += x))
1984    ///     .all_ticks()
1985    /// # }, |mut stream| async move {
1986    /// // (1, 5), (2, 7)
1987    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1988    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1989    /// # }));
1990    /// ```
1991    pub fn fold_keyed<A, I, F>(
1992        self,
1993        init: impl IntoQuotedMut<'a, I, Tick<L>>,
1994        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1995    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
1996    where
1997        I: Fn() -> A + 'a,
1998        F: Fn(&mut A, V) + 'a,
1999    {
2000        self.into_keyed().fold(init, comb).entries()
2001    }
2002
2003    #[deprecated = "use .into_keyed().reduce(...) instead"]
2004    /// A special case of [`Stream::reduce`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2005    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2006    /// in the second element are accumulated via the `comb` closure.
2007    ///
2008    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
2009    /// to depend on the order of elements in the stream.
2010    ///
2011    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed`].
2012    ///
2013    /// # Example
2014    /// ```rust
2015    /// # use hydro_lang::prelude::*;
2016    /// # use futures::StreamExt;
2017    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2018    /// let tick = process.tick();
2019    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2020    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2021    /// batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks()
2022    /// # }, |mut stream| async move {
2023    /// // (1, 5), (2, 7)
2024    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2025    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2026    /// # }));
2027    /// ```
2028    pub fn reduce_keyed<F>(
2029        self,
2030        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2031    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2032    where
2033        F: Fn(&mut V, V) + 'a,
2034    {
2035        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
2036
2037        Stream::new(
2038            self.location.clone(),
2039            HydroNode::ReduceKeyed {
2040                f,
2041                input: Box::new(self.ir_node.into_inner()),
2042                metadata: self.location.new_node_metadata::<(K, V)>(),
2043            },
2044        )
2045    }
2046}
2047
2048impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2049where
2050    K: Eq + Hash,
2051    L: Location<'a>,
2052{
2053    #[deprecated = "use .into_keyed().fold_commutative_idempotent(...) instead"]
2054    /// A special case of [`Stream::fold_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2055    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2056    /// in the second element are accumulated via the `comb` closure.
2057    ///
2058    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2059    /// as there may be non-deterministic duplicates.
2060    ///
2061    /// If the input and output value types are the same and do not require initialization then use
2062    /// [`Stream::reduce_keyed_commutative_idempotent`].
2063    ///
2064    /// # Example
2065    /// ```rust
2066    /// # use hydro_lang::prelude::*;
2067    /// # use futures::StreamExt;
2068    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2069    /// let tick = process.tick();
2070    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2071    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2072    /// batch
2073    ///     .fold_keyed_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2074    ///     .all_ticks()
2075    /// # }, |mut stream| async move {
2076    /// // (1, false), (2, true)
2077    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2078    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2079    /// # }));
2080    /// ```
2081    pub fn fold_keyed_commutative_idempotent<A, I, F>(
2082        self,
2083        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2084        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2085    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2086    where
2087        I: Fn() -> A + 'a,
2088        F: Fn(&mut A, V) + 'a,
2089    {
2090        self.into_keyed()
2091            .fold_commutative_idempotent(init, comb)
2092            .entries()
2093    }
2094
2095    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2096    /// # Example
2097    /// ```rust
2098    /// # use hydro_lang::prelude::*;
2099    /// # use futures::StreamExt;
2100    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2101    /// let tick = process.tick();
2102    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2103    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2104    /// batch.keys().all_ticks()
2105    /// # }, |mut stream| async move {
2106    /// // 1, 2
2107    /// # assert_eq!(stream.next().await.unwrap(), 1);
2108    /// # assert_eq!(stream.next().await.unwrap(), 2);
2109    /// # }));
2110    /// ```
2111    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2112        self.into_keyed()
2113            .fold_commutative_idempotent(q!(|| ()), q!(|_, _| {}))
2114            .keys()
2115    }
2116
2117    #[deprecated = "use .into_keyed().reduce_commutative_idempotent(...) instead"]
2118    /// A special case of [`Stream::reduce_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2119    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2120    /// in the second element are accumulated via the `comb` closure.
2121    ///
2122    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2123    /// as there may be non-deterministic duplicates.
2124    ///
2125    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative_idempotent`].
2126    ///
2127    /// # Example
2128    /// ```rust
2129    /// # use hydro_lang::prelude::*;
2130    /// # use futures::StreamExt;
2131    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2132    /// let tick = process.tick();
2133    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2134    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2135    /// batch
2136    ///     .reduce_keyed_commutative_idempotent(q!(|acc, x| *acc |= x))
2137    ///     .all_ticks()
2138    /// # }, |mut stream| async move {
2139    /// // (1, false), (2, true)
2140    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2141    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2142    /// # }));
2143    /// ```
2144    pub fn reduce_keyed_commutative_idempotent<F>(
2145        self,
2146        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2147    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2148    where
2149        F: Fn(&mut V, V) + 'a,
2150    {
2151        self.into_keyed()
2152            .reduce_commutative_idempotent(comb)
2153            .entries()
2154    }
2155}
2156
2157impl<'a, K, V, L, O: Ordering> Stream<(K, V), Tick<L>, Bounded, O, ExactlyOnce>
2158where
2159    K: Eq + Hash,
2160    L: Location<'a>,
2161{
2162    #[deprecated = "use .into_keyed().fold_commutative(...) instead"]
2163    /// A special case of [`Stream::fold_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2164    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2165    /// in the second element are accumulated via the `comb` closure.
2166    ///
2167    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2168    ///
2169    /// If the input and output value types are the same and do not require initialization then use
2170    /// [`Stream::reduce_keyed_commutative`].
2171    ///
2172    /// # Example
2173    /// ```rust
2174    /// # use hydro_lang::prelude::*;
2175    /// # use futures::StreamExt;
2176    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2177    /// let tick = process.tick();
2178    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2179    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2180    /// batch
2181    ///     .fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x))
2182    ///     .all_ticks()
2183    /// # }, |mut stream| async move {
2184    /// // (1, 5), (2, 7)
2185    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2186    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2187    /// # }));
2188    /// ```
2189    pub fn fold_keyed_commutative<A, I, F>(
2190        self,
2191        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2192        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2193    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2194    where
2195        I: Fn() -> A + 'a,
2196        F: Fn(&mut A, V) + 'a,
2197    {
2198        self.into_keyed().fold_commutative(init, comb).entries()
2199    }
2200
2201    #[deprecated = "use .into_keyed().reduce_commutative(...) instead"]
2202    /// A special case of [`Stream::reduce_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2203    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2204    /// in the second element are accumulated via the `comb` closure.
2205    ///
2206    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2207    ///
2208    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative`].
2209    ///
2210    /// # Example
2211    /// ```rust
2212    /// # use hydro_lang::prelude::*;
2213    /// # use futures::StreamExt;
2214    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2215    /// let tick = process.tick();
2216    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2217    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2218    /// batch
2219    ///     .reduce_keyed_commutative(q!(|acc, x| *acc += x))
2220    ///     .all_ticks()
2221    /// # }, |mut stream| async move {
2222    /// // (1, 5), (2, 7)
2223    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2224    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2225    /// # }));
2226    /// ```
2227    pub fn reduce_keyed_commutative<F>(
2228        self,
2229        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2230    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2231    where
2232        F: Fn(&mut V, V) + 'a,
2233    {
2234        self.into_keyed().reduce_commutative(comb).entries()
2235    }
2236}
2237
2238impl<'a, K, V, L, R: Retries> Stream<(K, V), Tick<L>, Bounded, TotalOrder, R>
2239where
2240    K: Eq + Hash,
2241    L: Location<'a>,
2242{
2243    #[deprecated = "use .into_keyed().fold_idempotent(...) instead"]
2244    /// A special case of [`Stream::fold_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2245    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2246    /// in the second element are accumulated via the `comb` closure.
2247    ///
2248    /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
2249    ///
2250    /// If the input and output value types are the same and do not require initialization then use
2251    /// [`Stream::reduce_keyed_idempotent`].
2252    ///
2253    /// # Example
2254    /// ```rust
2255    /// # use hydro_lang::prelude::*;
2256    /// # use futures::StreamExt;
2257    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2258    /// let tick = process.tick();
2259    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2260    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2261    /// batch
2262    ///     .fold_keyed_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2263    ///     .all_ticks()
2264    /// # }, |mut stream| async move {
2265    /// // (1, false), (2, true)
2266    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2267    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2268    /// # }));
2269    /// ```
2270    pub fn fold_keyed_idempotent<A, I, F>(
2271        self,
2272        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2273        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2274    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2275    where
2276        I: Fn() -> A + 'a,
2277        F: Fn(&mut A, V) + 'a,
2278    {
2279        self.into_keyed().fold_idempotent(init, comb).entries()
2280    }
2281
2282    #[deprecated = "use .into_keyed().reduce_idempotent(...) instead"]
2283    /// A special case of [`Stream::reduce_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2284    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2285    /// in the second element are accumulated via the `comb` closure.
2286    ///
2287    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
2288    ///
2289    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_idempotent`].
2290    ///
2291    /// # Example
2292    /// ```rust
2293    /// # use hydro_lang::prelude::*;
2294    /// # use futures::StreamExt;
2295    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2296    /// let tick = process.tick();
2297    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2298    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2299    /// batch
2300    ///     .reduce_keyed_idempotent(q!(|acc, x| *acc |= x))
2301    ///     .all_ticks()
2302    /// # }, |mut stream| async move {
2303    /// // (1, false), (2, true)
2304    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2305    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2306    /// # }));
2307    /// ```
2308    pub fn reduce_keyed_idempotent<F>(
2309        self,
2310        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2311    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2312    where
2313        F: Fn(&mut V, V) + 'a,
2314    {
2315        self.into_keyed().reduce_idempotent(comb).entries()
2316    }
2317}
2318
2319impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2320where
2321    L: Location<'a> + NoTick,
2322{
2323    /// Returns a stream corresponding to the latest batch of elements being atomically
2324    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2325    /// the order of the input.
2326    ///
2327    /// # Non-Determinism
2328    /// The batch boundaries are non-deterministic and may change across executions.
2329    pub fn batch(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2330        Stream::new(
2331            self.location.clone().tick,
2332            HydroNode::Unpersist {
2333                inner: Box::new(self.ir_node.into_inner()),
2334                metadata: self.location.new_node_metadata::<T>(),
2335            },
2336        )
2337    }
2338
2339    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2340    /// See [`Stream::atomic`] for more details.
2341    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2342        Stream::new(self.location.tick.l, self.ir_node.into_inner())
2343    }
2344
2345    /// Gets the [`Tick`] inside which this stream is synchronously processed. See [`Stream::atomic`].
2346    pub fn atomic_source(&self) -> Tick<L> {
2347        self.location.tick.clone()
2348    }
2349}
2350
2351impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2352where
2353    L: Location<'a> + NoTick + NoAtomic,
2354{
2355    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
2356    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
2357    ///
2358    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2359    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2360    /// argument that declares where the stream will be atomically processed. Batching a stream into
2361    /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2362    /// [`Tick`] will introduce asynchrony.
2363    pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
2364        Stream::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
2365    }
2366
2367    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2368    /// Future outputs are produced as available, regardless of input arrival order.
2369    ///
2370    /// # Example
2371    /// ```rust
2372    /// # use std::collections::HashSet;
2373    /// # use futures::StreamExt;
2374    /// # use hydro_lang::prelude::*;
2375    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2376    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2377    ///     .map(q!(|x| async move {
2378    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2379    ///         x
2380    ///     }))
2381    ///     .resolve_futures()
2382    /// #   },
2383    /// #   |mut stream| async move {
2384    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2385    /// #       let mut output = HashSet::new();
2386    /// #       for _ in 1..10 {
2387    /// #           output.insert(stream.next().await.unwrap());
2388    /// #       }
2389    /// #       assert_eq!(
2390    /// #           output,
2391    /// #           HashSet::<i32>::from_iter(1..10)
2392    /// #       );
2393    /// #   },
2394    /// # ));
2395    pub fn resolve_futures<T2>(self) -> Stream<T2, L, B, NoOrder, R>
2396    where
2397        T: Future<Output = T2>,
2398    {
2399        Stream::new(
2400            self.location.clone(),
2401            HydroNode::ResolveFutures {
2402                input: Box::new(self.ir_node.into_inner()),
2403                metadata: self.location.new_node_metadata::<T2>(),
2404            },
2405        )
2406    }
2407
2408    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2409    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2410    /// the order of the input. The output stream will execute in the [`Tick`] that was
2411    /// used to create the atomic section.
2412    ///
2413    /// # Non-Determinism
2414    /// The batch boundaries are non-deterministic and may change across executions.
2415    pub fn batch(self, tick: &Tick<L>, nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2416        self.atomic(tick).batch(nondet)
2417    }
2418
2419    /// Given a time interval, returns a stream corresponding to samples taken from the
2420    /// stream roughly at that interval. The output will have elements in the same order
2421    /// as the input, but with arbitrary elements skipped between samples. There is also
2422    /// no guarantee on the exact timing of the samples.
2423    ///
2424    /// # Non-Determinism
2425    /// The output stream is non-deterministic in which elements are sampled, since this
2426    /// is controlled by a clock.
2427    pub fn sample_every(
2428        self,
2429        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
2430        nondet: NonDet,
2431    ) -> Stream<T, L, Unbounded, O, AtLeastOnce> {
2432        let samples = self.location.source_interval(interval, nondet);
2433
2434        let tick = self.location.tick();
2435        self.batch(&tick, nondet)
2436            .filter_if_some(samples.batch(&tick, nondet).first())
2437            .all_ticks()
2438            .weakest_retries()
2439    }
2440
2441    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
2442    /// stream has not emitted a value since that duration.
2443    ///
2444    /// # Non-Determinism
2445    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
2446    /// samples take place, timeouts may be non-deterministically generated or missed,
2447    /// and the notification of the timeout may be delayed as well. There is also no
2448    /// guarantee on how long the [`Optional`] will have a value after the timeout is
2449    /// detected based on when the next sample is taken.
2450    pub fn timeout(
2451        self,
2452        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
2453        nondet: NonDet,
2454    ) -> Optional<(), L, Unbounded> {
2455        let tick = self.location.tick();
2456
2457        let latest_received = self.assume_retries(nondet).fold_commutative(
2458            q!(|| None),
2459            q!(|latest, _| {
2460                *latest = Some(Instant::now());
2461            }),
2462        );
2463
2464        latest_received
2465            .snapshot(&tick, nondet)
2466            .filter_map(q!(move |latest_received| {
2467                if let Some(latest_received) = latest_received {
2468                    if Instant::now().duration_since(latest_received) > duration {
2469                        Some(())
2470                    } else {
2471                        None
2472                    }
2473                } else {
2474                    Some(())
2475                }
2476            }))
2477            .latest()
2478    }
2479}
2480
2481impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2482where
2483    L: Location<'a> + NoTick + NoAtomic,
2484    F: Future<Output = T>,
2485{
2486    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2487    /// Future outputs are produced in the same order as the input stream.
2488    ///
2489    /// # Example
2490    /// ```rust
2491    /// # use std::collections::HashSet;
2492    /// # use futures::StreamExt;
2493    /// # use hydro_lang::prelude::*;
2494    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2495    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2496    ///     .map(q!(|x| async move {
2497    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2498    ///         x
2499    ///     }))
2500    ///     .resolve_futures_ordered()
2501    /// #   },
2502    /// #   |mut stream| async move {
2503    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2504    /// #       let mut output = Vec::new();
2505    /// #       for _ in 1..10 {
2506    /// #           output.push(stream.next().await.unwrap());
2507    /// #       }
2508    /// #       assert_eq!(
2509    /// #           output,
2510    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2511    /// #       );
2512    /// #   },
2513    /// # ));
2514    pub fn resolve_futures_ordered(self) -> Stream<T, L, B, O, R> {
2515        Stream::new(
2516            self.location.clone(),
2517            HydroNode::ResolveFuturesOrdered {
2518                input: Box::new(self.ir_node.into_inner()),
2519                metadata: self.location.new_node_metadata::<T>(),
2520            },
2521        )
2522    }
2523}
2524
2525impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
2526where
2527    L: Location<'a> + NoTick,
2528{
2529    /// Executes the provided closure for every element in this stream.
2530    ///
2531    /// Because the closure may have side effects, the stream must have deterministic order
2532    /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
2533    /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
2534    /// [`Stream::assume_retries`] with an explanation for why this is the case.
2535    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
2536        let f = f.splice_fn1_ctx(&self.location).into();
2537        let metadata = self.location.new_node_metadata::<T>();
2538        self.location
2539            .flow_state()
2540            .borrow_mut()
2541            .push_root(HydroRoot::ForEach {
2542                input: Box::new(HydroNode::Unpersist {
2543                    inner: Box::new(self.ir_node.into_inner()),
2544                    metadata: metadata.clone(),
2545                }),
2546                f,
2547                op_metadata: HydroIrOpMetadata::new(),
2548            });
2549    }
2550
2551    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
2552    /// TCP socket to some other server. You should _not_ use this API for interacting with
2553    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
2554    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
2555    /// interaction with asynchronous sinks.
2556    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
2557    where
2558        S: 'a + futures::Sink<T> + Unpin,
2559    {
2560        self.location
2561            .flow_state()
2562            .borrow_mut()
2563            .push_root(HydroRoot::DestSink {
2564                sink: sink.splice_typed_ctx(&self.location).into(),
2565                input: Box::new(self.ir_node.into_inner()),
2566                op_metadata: HydroIrOpMetadata::new(),
2567            });
2568    }
2569}
2570
2571impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2572where
2573    L: Location<'a>,
2574{
2575    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2576    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2577    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2578        Stream::new(
2579            self.location.outer().clone(),
2580            HydroNode::Persist {
2581                inner: Box::new(self.ir_node.into_inner()),
2582                metadata: self.location.new_node_metadata::<T>(),
2583            },
2584        )
2585    }
2586
2587    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2588    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2589    ///
2590    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2591    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2592    /// stream's [`Tick`] context.
2593    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2594        Stream::new(
2595            Atomic {
2596                tick: self.location.clone(),
2597            },
2598            HydroNode::Persist {
2599                inner: Box::new(self.ir_node.into_inner()),
2600                metadata: self.location.new_node_metadata::<T>(),
2601            },
2602        )
2603    }
2604
2605    /// Accumulates the elements of this stream **across ticks** by concatenating them together.
2606    ///
2607    /// The output stream in tick T will contain the elements of the input at tick 0, 1, ..., up to
2608    /// and including tick T. This is useful for accumulating streaming inputs across ticks, but be
2609    /// careful when using this operator, as its memory usage will grow linearly over time since it
2610    /// must store its inputs indefinitely.
2611    ///
2612    /// # Example
2613    /// ```rust
2614    /// # use hydro_lang::prelude::*;
2615    /// # use futures::StreamExt;
2616    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2617    /// let tick = process.tick();
2618    /// // ticks are lazy by default, forces the second tick to run
2619    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2620    ///
2621    /// let batch_first_tick = process
2622    ///   .source_iter(q!(vec![1, 2, 3, 4]))
2623    ///   .batch(&tick, nondet!(/** test */));
2624    /// let batch_second_tick = process
2625    ///   .source_iter(q!(vec![5, 6, 7, 8]))
2626    ///   .batch(&tick, nondet!(/** test */))
2627    ///   .defer_tick(); // appears on the second tick
2628    /// batch_first_tick.chain(batch_second_tick)
2629    ///   .persist()
2630    ///   .all_ticks()
2631    /// # }, |mut stream| async move {
2632    /// // [1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, ...]
2633    /// # for w in vec![1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8] {
2634    /// #     assert_eq!(stream.next().await.unwrap(), w);
2635    /// # }
2636    /// # }));
2637    /// ```
2638    pub fn persist(self) -> Stream<T, Tick<L>, Bounded, O, R>
2639    where
2640        T: Clone,
2641    {
2642        Stream::new(
2643            self.location.clone(),
2644            HydroNode::Persist {
2645                inner: Box::new(self.ir_node.into_inner()),
2646                metadata: self.location.new_node_metadata::<T>(),
2647            },
2648        )
2649    }
2650
2651    #[expect(missing_docs, reason = "TODO")]
2652    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2653        Stream::new(
2654            self.location.clone(),
2655            HydroNode::DeferTick {
2656                input: Box::new(self.ir_node.into_inner()),
2657                metadata: self.location.new_node_metadata::<T>(),
2658            },
2659        )
2660    }
2661}
2662
2663#[cfg(test)]
2664mod tests {
2665    use futures::{SinkExt, StreamExt};
2666    use hydro_deploy::Deployment;
2667    use serde::{Deserialize, Serialize};
2668    use stageleft::q;
2669
2670    use crate::compile::builder::FlowBuilder;
2671    use crate::location::Location;
2672
2673    mod backtrace_chained_ops;
2674
2675    struct P1 {}
2676    struct P2 {}
2677
2678    #[derive(Serialize, Deserialize, Debug)]
2679    struct SendOverNetwork {
2680        n: u32,
2681    }
2682
2683    #[tokio::test]
2684    async fn first_ten_distributed() {
2685        let mut deployment = Deployment::new();
2686
2687        let flow = FlowBuilder::new();
2688        let first_node = flow.process::<P1>();
2689        let second_node = flow.process::<P2>();
2690        let external = flow.external::<P2>();
2691
2692        let numbers = first_node.source_iter(q!(0..10));
2693        let out_port = numbers
2694            .map(q!(|n| SendOverNetwork { n }))
2695            .send_bincode(&second_node)
2696            .send_bincode_external(&external);
2697
2698        let nodes = flow
2699            .with_process(&first_node, deployment.Localhost())
2700            .with_process(&second_node, deployment.Localhost())
2701            .with_external(&external, deployment.Localhost())
2702            .deploy(&mut deployment);
2703
2704        deployment.deploy().await.unwrap();
2705
2706        let mut external_out = nodes.connect_source_bincode(out_port).await;
2707
2708        deployment.start().await.unwrap();
2709
2710        for i in 0..10 {
2711            assert_eq!(external_out.next().await.unwrap().n, i);
2712        }
2713    }
2714
2715    #[tokio::test]
2716    async fn first_cardinality() {
2717        let mut deployment = Deployment::new();
2718
2719        let flow = FlowBuilder::new();
2720        let node = flow.process::<()>();
2721        let external = flow.external::<()>();
2722
2723        let node_tick = node.tick();
2724        let count = node_tick
2725            .singleton(q!([1, 2, 3]))
2726            .into_stream()
2727            .flatten_ordered()
2728            .first()
2729            .into_stream()
2730            .count()
2731            .all_ticks()
2732            .send_bincode_external(&external);
2733
2734        let nodes = flow
2735            .with_process(&node, deployment.Localhost())
2736            .with_external(&external, deployment.Localhost())
2737            .deploy(&mut deployment);
2738
2739        deployment.deploy().await.unwrap();
2740
2741        let mut external_out = nodes.connect_source_bincode(count).await;
2742
2743        deployment.start().await.unwrap();
2744
2745        assert_eq!(external_out.next().await.unwrap(), 1);
2746    }
2747
2748    #[tokio::test]
2749    async fn unbounded_scan_remembers_state() {
2750        let mut deployment = Deployment::new();
2751
2752        let flow = FlowBuilder::new();
2753        let node = flow.process::<()>();
2754        let external = flow.external::<()>();
2755
2756        let (input_port, input) = node.source_external_bincode(&external);
2757        let out = input
2758            .scan(
2759                q!(|| 0),
2760                q!(|acc, v| {
2761                    *acc += v;
2762                    Some(*acc)
2763                }),
2764            )
2765            .send_bincode_external(&external);
2766
2767        let nodes = flow
2768            .with_process(&node, deployment.Localhost())
2769            .with_external(&external, deployment.Localhost())
2770            .deploy(&mut deployment);
2771
2772        deployment.deploy().await.unwrap();
2773
2774        let mut external_in = nodes.connect_sink_bincode(input_port).await;
2775        let mut external_out = nodes.connect_source_bincode(out).await;
2776
2777        deployment.start().await.unwrap();
2778
2779        external_in.send(1).await.unwrap();
2780        assert_eq!(external_out.next().await.unwrap(), 1);
2781
2782        external_in.send(2).await.unwrap();
2783        assert_eq!(external_out.next().await.unwrap(), 3);
2784    }
2785}