hydro_lang/
stream.rs

1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::future::Future;
4use std::hash::Hash;
5use std::marker::PhantomData;
6use std::ops::Deref;
7use std::rc::Rc;
8
9use stageleft::{IntoQuotedMut, QuotedWithContext, q};
10use syn::parse_quote;
11use tokio::time::Instant;
12
13use crate::boundedness::Boundedness;
14use crate::builder::FLOW_USED_MESSAGE;
15use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker};
16use crate::ir::{HydroLeaf, HydroNode, TeeNode};
17use crate::keyed_stream::KeyedStream;
18use crate::location::tick::{Atomic, NoAtomic};
19use crate::location::{Location, LocationId, NoTick, Tick, check_matching_location};
20use crate::manual_expr::ManualExpr;
21use crate::unsafety::NonDet;
22use crate::*;
23
24pub mod networking;
25
26/// Marks the stream as being totally ordered, which means that there are
27/// no sources of non-determinism (other than intentional ones) that will
28/// affect the order of elements.
29pub enum TotalOrder {}
30
31/// Marks the stream as having no order, which means that the order of
32/// elements may be affected by non-determinism.
33///
34/// This restricts certain operators, such as `fold` and `reduce`, to only
35/// be used with commutative aggregation functions.
36pub enum NoOrder {}
37
38/// Helper trait for determining the weakest of two orderings.
39#[sealed::sealed]
40pub trait MinOrder<Other> {
41    /// The weaker of the two orderings.
42    type Min;
43}
44
45#[sealed::sealed]
46impl<T> MinOrder<T> for T {
47    type Min = T;
48}
49
50#[sealed::sealed]
51impl MinOrder<NoOrder> for TotalOrder {
52    type Min = NoOrder;
53}
54
55#[sealed::sealed]
56impl MinOrder<TotalOrder> for NoOrder {
57    type Min = NoOrder;
58}
59
60/// Marks the stream as having deterministic message cardinality, with no
61/// possibility of duplicates.
62pub enum ExactlyOnce {}
63
64/// Marks the stream as having non-deterministic message cardinality, which
65/// means that duplicates may occur, but messages will not be dropped.
66pub enum AtLeastOnce {}
67
68/// Helper trait for determining the weakest of two retry guarantees.
69#[sealed::sealed]
70pub trait MinRetries<Other> {
71    /// The weaker of the two retry guarantees.
72    type Min;
73}
74
75#[sealed::sealed]
76impl<T> MinRetries<T> for T {
77    type Min = T;
78}
79
80#[sealed::sealed]
81impl MinRetries<ExactlyOnce> for AtLeastOnce {
82    type Min = AtLeastOnce;
83}
84
85#[sealed::sealed]
86impl MinRetries<AtLeastOnce> for ExactlyOnce {
87    type Min = AtLeastOnce;
88}
89
90/// An ordered sequence stream of elements of type `T`.
91///
92/// Type Parameters:
93/// - `Type`: the type of elements in the stream
94/// - `Loc`: the location where the stream is being materialized
95/// - `Bound`: the boundedness of the stream, which is either [`Bounded`]
96///   or [`Unbounded`]
97/// - `Order`: the ordering of the stream, which is either [`TotalOrder`]
98///   or [`NoOrder`] (default is [`TotalOrder`])
99pub struct Stream<Type, Loc, Bound: Boundedness, Order = TotalOrder, Retries = ExactlyOnce> {
100    pub(crate) location: Loc,
101    pub(crate) ir_node: RefCell<HydroNode>,
102
103    _phantom: PhantomData<(Type, Loc, Bound, Order, Retries)>,
104}
105
106impl<'a, T, L, O, R> From<Stream<T, L, Bounded, O, R>> for Stream<T, L, Unbounded, O, R>
107where
108    L: Location<'a>,
109{
110    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
111        Stream {
112            location: stream.location,
113            ir_node: stream.ir_node,
114            _phantom: PhantomData,
115        }
116    }
117}
118
119impl<'a, T, L, B: Boundedness, R> From<Stream<T, L, B, TotalOrder, R>>
120    for Stream<T, L, B, NoOrder, R>
121where
122    L: Location<'a>,
123{
124    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
125        Stream {
126            location: stream.location,
127            ir_node: stream.ir_node,
128            _phantom: PhantomData,
129        }
130    }
131}
132
133impl<'a, T, L, B: Boundedness, O> From<Stream<T, L, B, O, ExactlyOnce>>
134    for Stream<T, L, B, O, AtLeastOnce>
135where
136    L: Location<'a>,
137{
138    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
139        Stream {
140            location: stream.location,
141            ir_node: stream.ir_node,
142            _phantom: PhantomData,
143        }
144    }
145}
146
147impl<'a, T, L, O, R> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
148where
149    L: Location<'a>,
150{
151    fn defer_tick(self) -> Self {
152        Stream::defer_tick(self)
153    }
154}
155
156impl<'a, T, L, O, R> CycleCollection<'a, TickCycleMarker> for Stream<T, Tick<L>, Bounded, O, R>
157where
158    L: Location<'a>,
159{
160    type Location = Tick<L>;
161
162    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
163        Stream::new(
164            location.clone(),
165            HydroNode::CycleSource {
166                ident,
167                metadata: location.new_node_metadata::<T>(),
168            },
169        )
170    }
171}
172
173impl<'a, T, L, O, R> CycleComplete<'a, TickCycleMarker> for Stream<T, Tick<L>, Bounded, O, R>
174where
175    L: Location<'a>,
176{
177    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
178        assert_eq!(
179            self.location.id(),
180            expected_location,
181            "locations do not match"
182        );
183        self.location
184            .flow_state()
185            .borrow_mut()
186            .leaves
187            .as_mut()
188            .expect(FLOW_USED_MESSAGE)
189            .push(HydroLeaf::CycleSink {
190                ident,
191                input: Box::new(self.ir_node.into_inner()),
192                metadata: self.location.new_node_metadata::<T>(),
193            });
194    }
195}
196
197impl<'a, T, L, B: Boundedness, O, R> CycleCollection<'a, ForwardRefMarker> for Stream<T, L, B, O, R>
198where
199    L: Location<'a> + NoTick,
200{
201    type Location = L;
202
203    fn create_source(ident: syn::Ident, location: L) -> Self {
204        Stream::new(
205            location.clone(),
206            HydroNode::Persist {
207                inner: Box::new(HydroNode::CycleSource {
208                    ident,
209                    metadata: location.new_node_metadata::<T>(),
210                }),
211                metadata: location.new_node_metadata::<T>(),
212            },
213        )
214    }
215}
216
217impl<'a, T, L, B: Boundedness, O, R> CycleComplete<'a, ForwardRefMarker> for Stream<T, L, B, O, R>
218where
219    L: Location<'a> + NoTick,
220{
221    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
222        assert_eq!(
223            self.location.id(),
224            expected_location,
225            "locations do not match"
226        );
227        let metadata = self.location.new_node_metadata::<T>();
228        self.location
229            .flow_state()
230            .borrow_mut()
231            .leaves
232            .as_mut()
233            .expect(FLOW_USED_MESSAGE)
234            .push(HydroLeaf::CycleSink {
235                ident,
236                input: Box::new(HydroNode::Unpersist {
237                    inner: Box::new(self.ir_node.into_inner()),
238                    metadata: metadata.clone(),
239                }),
240                metadata,
241            });
242    }
243}
244
245impl<'a, T, L, B: Boundedness, O, R> Clone for Stream<T, L, B, O, R>
246where
247    T: Clone,
248    L: Location<'a>,
249{
250    fn clone(&self) -> Self {
251        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
252            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
253            *self.ir_node.borrow_mut() = HydroNode::Tee {
254                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
255                metadata: self.location.new_node_metadata::<T>(),
256            };
257        }
258
259        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
260            Stream {
261                location: self.location.clone(),
262                ir_node: HydroNode::Tee {
263                    inner: TeeNode(inner.0.clone()),
264                    metadata: metadata.clone(),
265                }
266                .into(),
267                _phantom: PhantomData,
268            }
269        } else {
270            unreachable!()
271        }
272    }
273}
274
275impl<'a, T, L, B: Boundedness, O, R> Stream<T, L, B, O, R>
276where
277    L: Location<'a>,
278{
279    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
280        Stream {
281            location,
282            ir_node: RefCell::new(ir_node),
283            _phantom: PhantomData,
284        }
285    }
286
287    /// Produces a stream based on invoking `f` on each element.
288    /// If you do not want to modify the stream and instead only want to view
289    /// each item use [`Stream::inspect`] instead.
290    ///
291    /// # Example
292    /// ```rust
293    /// # use hydro_lang::*;
294    /// # use futures::StreamExt;
295    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
296    /// let words = process.source_iter(q!(vec!["hello", "world"]));
297    /// words.map(q!(|x| x.to_uppercase()))
298    /// # }, |mut stream| async move {
299    /// # for w in vec!["HELLO", "WORLD"] {
300    /// #     assert_eq!(stream.next().await.unwrap(), w);
301    /// # }
302    /// # }));
303    /// ```
304    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
305    where
306        F: Fn(T) -> U + 'a,
307    {
308        let f = f.splice_fn1_ctx(&self.location).into();
309        Stream::new(
310            self.location.clone(),
311            HydroNode::Map {
312                f,
313                input: Box::new(self.ir_node.into_inner()),
314                metadata: self.location.new_node_metadata::<U>(),
315            },
316        )
317    }
318
319    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
320    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
321    /// for the output type `U` must produce items in a **deterministic** order.
322    ///
323    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
324    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
325    ///
326    /// # Example
327    /// ```rust
328    /// # use hydro_lang::*;
329    /// # use futures::StreamExt;
330    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
331    /// process
332    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
333    ///     .flat_map_ordered(q!(|x| x))
334    /// # }, |mut stream| async move {
335    /// // 1, 2, 3, 4
336    /// # for w in (1..5) {
337    /// #     assert_eq!(stream.next().await.unwrap(), w);
338    /// # }
339    /// # }));
340    /// ```
341    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
342    where
343        I: IntoIterator<Item = U>,
344        F: Fn(T) -> I + 'a,
345    {
346        let f = f.splice_fn1_ctx(&self.location).into();
347        Stream::new(
348            self.location.clone(),
349            HydroNode::FlatMap {
350                f,
351                input: Box::new(self.ir_node.into_inner()),
352                metadata: self.location.new_node_metadata::<U>(),
353            },
354        )
355    }
356
357    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
358    /// for the output type `U` to produce items in any order.
359    ///
360    /// # Example
361    /// ```rust
362    /// # use hydro_lang::{*, stream::ExactlyOnce};
363    /// # use futures::StreamExt;
364    /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
365    /// process
366    ///     .source_iter(q!(vec![
367    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
368    ///         std::collections::HashSet::from_iter(vec![3, 4]),
369    ///     ]))
370    ///     .flat_map_unordered(q!(|x| x))
371    /// # }, |mut stream| async move {
372    /// // 1, 2, 3, 4, but in no particular order
373    /// # let mut results = Vec::new();
374    /// # for w in (1..5) {
375    /// #     results.push(stream.next().await.unwrap());
376    /// # }
377    /// # results.sort();
378    /// # assert_eq!(results, vec![1, 2, 3, 4]);
379    /// # }));
380    /// ```
381    pub fn flat_map_unordered<U, I, F>(
382        self,
383        f: impl IntoQuotedMut<'a, F, L>,
384    ) -> Stream<U, L, B, NoOrder, R>
385    where
386        I: IntoIterator<Item = U>,
387        F: Fn(T) -> I + 'a,
388    {
389        let f = f.splice_fn1_ctx(&self.location).into();
390        Stream::new(
391            self.location.clone(),
392            HydroNode::FlatMap {
393                f,
394                input: Box::new(self.ir_node.into_inner()),
395                metadata: self.location.new_node_metadata::<U>(),
396            },
397        )
398    }
399
400    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
401    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
402    ///
403    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
404    /// not deterministic, use [`Stream::flatten_unordered`] instead.
405    ///
406    /// ```rust
407    /// # use hydro_lang::*;
408    /// # use futures::StreamExt;
409    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
410    /// process
411    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
412    ///     .flatten_ordered()
413    /// # }, |mut stream| async move {
414    /// // 1, 2, 3, 4
415    /// # for w in (1..5) {
416    /// #     assert_eq!(stream.next().await.unwrap(), w);
417    /// # }
418    /// # }));
419    /// ```
420    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
421    where
422        T: IntoIterator<Item = U>,
423    {
424        self.flat_map_ordered(q!(|d| d))
425    }
426
427    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
428    /// for the element type `T` to produce items in any order.
429    ///
430    /// # Example
431    /// ```rust
432    /// # use hydro_lang::{*, stream::ExactlyOnce};
433    /// # use futures::StreamExt;
434    /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
435    /// process
436    ///     .source_iter(q!(vec![
437    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
438    ///         std::collections::HashSet::from_iter(vec![3, 4]),
439    ///     ]))
440    ///     .flatten_unordered()
441    /// # }, |mut stream| async move {
442    /// // 1, 2, 3, 4, but in no particular order
443    /// # let mut results = Vec::new();
444    /// # for w in (1..5) {
445    /// #     results.push(stream.next().await.unwrap());
446    /// # }
447    /// # results.sort();
448    /// # assert_eq!(results, vec![1, 2, 3, 4]);
449    /// # }));
450    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
451    where
452        T: IntoIterator<Item = U>,
453    {
454        self.flat_map_unordered(q!(|d| d))
455    }
456
457    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
458    /// `f`, preserving the order of the elements.
459    ///
460    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
461    /// not modify or take ownership of the values. If you need to modify the values while filtering
462    /// use [`Stream::filter_map`] instead.
463    ///
464    /// # Example
465    /// ```rust
466    /// # use hydro_lang::*;
467    /// # use futures::StreamExt;
468    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
469    /// process
470    ///     .source_iter(q!(vec![1, 2, 3, 4]))
471    ///     .filter(q!(|&x| x > 2))
472    /// # }, |mut stream| async move {
473    /// // 3, 4
474    /// # for w in (3..5) {
475    /// #     assert_eq!(stream.next().await.unwrap(), w);
476    /// # }
477    /// # }));
478    /// ```
479    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<T, L, B, O, R>
480    where
481        F: Fn(&T) -> bool + 'a,
482    {
483        let f = f.splice_fn1_borrow_ctx(&self.location).into();
484        Stream::new(
485            self.location.clone(),
486            HydroNode::Filter {
487                f,
488                input: Box::new(self.ir_node.into_inner()),
489                metadata: self.location.new_node_metadata::<T>(),
490            },
491        )
492    }
493
494    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
495    ///
496    /// # Example
497    /// ```rust
498    /// # use hydro_lang::*;
499    /// # use futures::StreamExt;
500    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
501    /// process
502    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
503    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
504    /// # }, |mut stream| async move {
505    /// // 1, 2
506    /// # for w in (1..3) {
507    /// #     assert_eq!(stream.next().await.unwrap(), w);
508    /// # }
509    /// # }));
510    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
511    where
512        F: Fn(T) -> Option<U> + 'a,
513    {
514        let f = f.splice_fn1_ctx(&self.location).into();
515        Stream::new(
516            self.location.clone(),
517            HydroNode::FilterMap {
518                f,
519                input: Box::new(self.ir_node.into_inner()),
520                metadata: self.location.new_node_metadata::<U>(),
521            },
522        )
523    }
524
525    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
526    /// where `x` is the final value of `other`, a bounded [`Singleton`].
527    ///
528    /// # Example
529    /// ```rust
530    /// # use hydro_lang::*;
531    /// # use futures::StreamExt;
532    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
533    /// let tick = process.tick();
534    /// let batch = process
535    ///   .source_iter(q!(vec![1, 2, 3, 4]))
536    ///   .batch(&tick, nondet!(/** test */));
537    /// let count = batch.clone().count(); // `count()` returns a singleton
538    /// batch.cross_singleton(count).all_ticks()
539    /// # }, |mut stream| async move {
540    /// // (1, 4), (2, 4), (3, 4), (4, 4)
541    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
542    /// #     assert_eq!(stream.next().await.unwrap(), w);
543    /// # }
544    /// # }));
545    pub fn cross_singleton<O2>(
546        self,
547        other: impl Into<Optional<O2, L, Bounded>>,
548    ) -> Stream<(T, O2), L, B, O, R>
549    where
550        O2: Clone,
551    {
552        let other: Optional<O2, L, Bounded> = other.into();
553        check_matching_location(&self.location, &other.location);
554
555        Stream::new(
556            self.location.clone(),
557            HydroNode::CrossSingleton {
558                left: Box::new(self.ir_node.into_inner()),
559                right: Box::new(other.ir_node.into_inner()),
560                metadata: self.location.new_node_metadata::<(T, O2)>(),
561            },
562        )
563    }
564
565    /// Allow this stream through if the argument (a Bounded Optional) is non-empty, otherwise the output is empty.
566    pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
567        self.cross_singleton(signal.map(q!(|_u| ())))
568            .map(q!(|(d, _signal)| d))
569    }
570
571    /// Allow this stream through if the argument (a Bounded Optional) is empty, otherwise the output is empty.
572    pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
573        self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
574    }
575
576    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
577    /// tupled pairs in a non-deterministic order.
578    ///
579    /// # Example
580    /// ```rust
581    /// # use hydro_lang::*;
582    /// # use std::collections::HashSet;
583    /// # use futures::StreamExt;
584    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
585    /// let tick = process.tick();
586    /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
587    /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
588    /// stream1.cross_product(stream2)
589    /// # }, |mut stream| async move {
590    /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
591    /// # stream.map(|i| assert!(expected.contains(&i)));
592    /// # }));
593    pub fn cross_product<T2, O2>(
594        self,
595        other: Stream<T2, L, B, O2, R>,
596    ) -> Stream<(T, T2), L, B, NoOrder, R>
597    where
598        T: Clone,
599        T2: Clone,
600    {
601        check_matching_location(&self.location, &other.location);
602
603        Stream::new(
604            self.location.clone(),
605            HydroNode::CrossProduct {
606                left: Box::new(self.ir_node.into_inner()),
607                right: Box::new(other.ir_node.into_inner()),
608                metadata: self.location.new_node_metadata::<(T, T2)>(),
609            },
610        )
611    }
612
613    /// Takes one stream as input and filters out any duplicate occurrences. The output
614    /// contains all unique values from the input.
615    ///
616    /// # Example
617    /// ```rust
618    /// # use hydro_lang::*;
619    /// # use futures::StreamExt;
620    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
621    /// let tick = process.tick();
622    ///     process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
623    /// # }, |mut stream| async move {
624    /// # for w in vec![1, 2, 3, 4] {
625    /// #     assert_eq!(stream.next().await.unwrap(), w);
626    /// # }
627    /// # }));
628    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
629    where
630        T: Eq + Hash,
631    {
632        Stream::new(
633            self.location.clone(),
634            HydroNode::Unique {
635                input: Box::new(self.ir_node.into_inner()),
636                metadata: self.location.new_node_metadata::<T>(),
637            },
638        )
639    }
640
641    /// Outputs everything in this stream that is *not* contained in the `other` stream.
642    ///
643    /// The `other` stream must be [`Bounded`], since this function will wait until
644    /// all its elements are available before producing any output.
645    /// # Example
646    /// ```rust
647    /// # use hydro_lang::*;
648    /// # use futures::StreamExt;
649    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
650    /// let tick = process.tick();
651    /// let stream = process
652    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
653    ///   .batch(&tick, nondet!(/** test */));
654    /// let batch = process
655    ///   .source_iter(q!(vec![1, 2]))
656    ///   .batch(&tick, nondet!(/** test */));
657    /// stream.filter_not_in(batch).all_ticks()
658    /// # }, |mut stream| async move {
659    /// # for w in vec![3, 4] {
660    /// #     assert_eq!(stream.next().await.unwrap(), w);
661    /// # }
662    /// # }));
663    pub fn filter_not_in<O2>(
664        self,
665        other: Stream<T, L, Bounded, O2, R>,
666    ) -> Stream<T, L, Bounded, O, R>
667    where
668        T: Eq + Hash,
669    {
670        check_matching_location(&self.location, &other.location);
671
672        Stream::new(
673            self.location.clone(),
674            HydroNode::Difference {
675                pos: Box::new(self.ir_node.into_inner()),
676                neg: Box::new(other.ir_node.into_inner()),
677                metadata: self.location.new_node_metadata::<T>(),
678            },
679        )
680    }
681
682    /// An operator which allows you to "inspect" each element of a stream without
683    /// modifying it. The closure `f` is called on a reference to each item. This is
684    /// mainly useful for debugging, and should not be used to generate side-effects.
685    ///
686    /// # Example
687    /// ```rust
688    /// # use hydro_lang::*;
689    /// # use futures::StreamExt;
690    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
691    /// let nums = process.source_iter(q!(vec![1, 2]));
692    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
693    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
694    /// # }, |mut stream| async move {
695    /// # for w in vec![1, 2] {
696    /// #     assert_eq!(stream.next().await.unwrap(), w);
697    /// # }
698    /// # }));
699    /// ```
700    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<T, L, B, O, R>
701    where
702        F: Fn(&T) + 'a,
703    {
704        let f = f.splice_fn1_borrow_ctx(&self.location).into();
705
706        if L::is_top_level() {
707            Stream::new(
708                self.location.clone(),
709                HydroNode::Persist {
710                    inner: Box::new(HydroNode::Inspect {
711                        f,
712                        input: Box::new(HydroNode::Unpersist {
713                            inner: Box::new(self.ir_node.into_inner()),
714                            metadata: self.location.new_node_metadata::<T>(),
715                        }),
716                        metadata: self.location.new_node_metadata::<T>(),
717                    }),
718                    metadata: self.location.new_node_metadata::<T>(),
719                },
720            )
721        } else {
722            Stream::new(
723                self.location.clone(),
724                HydroNode::Inspect {
725                    f,
726                    input: Box::new(self.ir_node.into_inner()),
727                    metadata: self.location.new_node_metadata::<T>(),
728                },
729            )
730        }
731    }
732
733    /// An operator which allows you to "name" a `HydroNode`.
734    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
735    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
736        {
737            let mut node = self.ir_node.borrow_mut();
738            let metadata = node.metadata_mut();
739            metadata.tag = Some(name.to_string());
740        }
741        self
742    }
743
744    /// Explicitly "casts" the stream to a type with a different ordering
745    /// guarantee. Useful in unsafe code where the ordering cannot be proven
746    /// by the type-system.
747    ///
748    /// # Non-Determinism
749    /// This function is used as an escape hatch, and any mistakes in the
750    /// provided ordering guarantee will propagate into the guarantees
751    /// for the rest of the program.
752    pub fn assume_ordering<O2>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
753        Stream::new(self.location, self.ir_node.into_inner())
754    }
755
756    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
757    /// which is always safe because that is the weakest possible guarantee.
758    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
759        let nondet = nondet!(/** this is a weaker odering guarantee, so it is safe to assume */);
760        self.assume_ordering::<NoOrder>(nondet)
761    }
762
763    /// Explicitly "casts" the stream to a type with a different retries
764    /// guarantee. Useful in unsafe code where the lack of retries cannot
765    /// be proven by the type-system.
766    ///
767    /// # Non-Determinism
768    /// This function is used as an escape hatch, and any mistakes in the
769    /// provided retries guarantee will propagate into the guarantees
770    /// for the rest of the program.
771    pub fn assume_retries<R2>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
772        Stream::new(self.location, self.ir_node.into_inner())
773    }
774
775    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
776    /// which is always safe because that is the weakest possible guarantee.
777    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
778        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
779        self.assume_retries::<AtLeastOnce>(nondet)
780    }
781
782    /// Weakens the retries guarantee provided by the stream to be the weaker of the
783    /// current guarantee and `R2`. This is safe because the output guarantee will
784    /// always be weaker than the input.
785    pub fn weaken_retries<R2>(self) -> Stream<T, L, B, O, <R as MinRetries<R2>>::Min>
786    where
787        R: MinRetries<R2>,
788    {
789        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
790        self.assume_retries::<<R as MinRetries<R2>>::Min>(nondet)
791    }
792}
793
794impl<'a, T, L, B: Boundedness, O> Stream<T, L, B, O, ExactlyOnce>
795where
796    L: Location<'a>,
797{
798    /// Given a stream with [`ExactlyOnce`] retry guarantees, weakens it to an arbitrary guarantee
799    /// `R2`, which is safe because all guarantees are equal to or weaker than [`ExactlyOnce`]
800    pub fn weaker_retries<R2>(self) -> Stream<T, L, B, O, R2> {
801        self.assume_retries(
802            nondet!(/** any retry ordering is the same or weaker than ExactlyOnce */),
803        )
804    }
805}
806
807impl<'a, T, L, B: Boundedness, O, R> Stream<&T, L, B, O, R>
808where
809    L: Location<'a>,
810{
811    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
812    ///
813    /// # Example
814    /// ```rust
815    /// # use hydro_lang::*;
816    /// # use futures::StreamExt;
817    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
818    /// process.source_iter(q!(&[1, 2, 3])).cloned()
819    /// # }, |mut stream| async move {
820    /// // 1, 2, 3
821    /// # for w in vec![1, 2, 3] {
822    /// #     assert_eq!(stream.next().await.unwrap(), w);
823    /// # }
824    /// # }));
825    /// ```
826    pub fn cloned(self) -> Stream<T, L, B, O, R>
827    where
828        T: Clone,
829    {
830        self.map(q!(|d| d.clone()))
831    }
832}
833
834impl<'a, T, L, B: Boundedness, O, R> Stream<T, L, B, O, R>
835where
836    L: Location<'a>,
837{
838    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
839    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
840    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
841    ///
842    /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
843    /// and there may be duplicates.
844    ///
845    /// # Example
846    /// ```rust
847    /// # use hydro_lang::*;
848    /// # use futures::StreamExt;
849    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
850    /// let tick = process.tick();
851    /// let bools = process.source_iter(q!(vec![false, true, false]));
852    /// let batch = bools.batch(&tick, nondet!(/** test */));
853    /// batch
854    ///     .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
855    ///     .all_ticks()
856    /// # }, |mut stream| async move {
857    /// // true
858    /// # assert_eq!(stream.next().await.unwrap(), true);
859    /// # }));
860    /// ```
861    pub fn fold_commutative_idempotent<A, I, F>(
862        self,
863        init: impl IntoQuotedMut<'a, I, L>,
864        comb: impl IntoQuotedMut<'a, F, L>,
865    ) -> Singleton<A, L, B>
866    where
867        I: Fn() -> A + 'a,
868        F: Fn(&mut A, T),
869    {
870        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
871        self.assume_ordering(nondet)
872            .assume_retries(nondet)
873            .fold(init, comb)
874    }
875
876    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
877    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
878    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
879    /// reference, so that it can be modified in place.
880    ///
881    /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
882    /// and there may be duplicates.
883    ///
884    /// # Example
885    /// ```rust
886    /// # use hydro_lang::*;
887    /// # use futures::StreamExt;
888    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
889    /// let tick = process.tick();
890    /// let bools = process.source_iter(q!(vec![false, true, false]));
891    /// let batch = bools.batch(&tick, nondet!(/** test */));
892    /// batch
893    ///     .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
894    ///     .all_ticks()
895    /// # }, |mut stream| async move {
896    /// // true
897    /// # assert_eq!(stream.next().await.unwrap(), true);
898    /// # }));
899    /// ```
900    pub fn reduce_commutative_idempotent<F>(
901        self,
902        comb: impl IntoQuotedMut<'a, F, L>,
903    ) -> Optional<T, L, B>
904    where
905        F: Fn(&mut T, T) + 'a,
906    {
907        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
908        self.assume_ordering(nondet)
909            .assume_retries(nondet)
910            .reduce(comb)
911    }
912
913    /// Computes the maximum element in the stream as an [`Optional`], which
914    /// will be empty until the first element in the input arrives.
915    ///
916    /// # Example
917    /// ```rust
918    /// # use hydro_lang::*;
919    /// # use futures::StreamExt;
920    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
921    /// let tick = process.tick();
922    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
923    /// let batch = numbers.batch(&tick, nondet!(/** test */));
924    /// batch.max().all_ticks()
925    /// # }, |mut stream| async move {
926    /// // 4
927    /// # assert_eq!(stream.next().await.unwrap(), 4);
928    /// # }));
929    /// ```
930    pub fn max(self) -> Optional<T, L, B>
931    where
932        T: Ord,
933    {
934        self.reduce_commutative_idempotent(q!(|curr, new| {
935            if new > *curr {
936                *curr = new;
937            }
938        }))
939    }
940
941    /// Computes the maximum element in the stream as an [`Optional`], where the
942    /// maximum is determined according to the `key` function. The [`Optional`] will
943    /// be empty until the first element in the input arrives.
944    ///
945    /// # Example
946    /// ```rust
947    /// # use hydro_lang::*;
948    /// # use futures::StreamExt;
949    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
950    /// let tick = process.tick();
951    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
952    /// let batch = numbers.batch(&tick, nondet!(/** test */));
953    /// batch.max_by_key(q!(|x| -x)).all_ticks()
954    /// # }, |mut stream| async move {
955    /// // 1
956    /// # assert_eq!(stream.next().await.unwrap(), 1);
957    /// # }));
958    /// ```
959    pub fn max_by_key<K, F>(self, key: impl IntoQuotedMut<'a, F, L> + Copy) -> Optional<T, L, B>
960    where
961        K: Ord,
962        F: Fn(&T) -> K + 'a,
963    {
964        let f = key.splice_fn1_borrow_ctx(&self.location);
965
966        let wrapped: syn::Expr = parse_quote!({
967            let key_fn = #f;
968            move |curr, new| {
969                if key_fn(&new) > key_fn(&*curr) {
970                    *curr = new;
971                }
972            }
973        });
974
975        let mut core = HydroNode::Reduce {
976            f: wrapped.into(),
977            input: Box::new(self.ir_node.into_inner()),
978            metadata: self.location.new_node_metadata::<T>(),
979        };
980
981        if L::is_top_level() {
982            core = HydroNode::Persist {
983                inner: Box::new(core),
984                metadata: self.location.new_node_metadata::<T>(),
985            };
986        }
987
988        Optional::new(self.location, core)
989    }
990
991    /// Computes the minimum element in the stream as an [`Optional`], which
992    /// will be empty until the first element in the input arrives.
993    ///
994    /// # Example
995    /// ```rust
996    /// # use hydro_lang::*;
997    /// # use futures::StreamExt;
998    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
999    /// let tick = process.tick();
1000    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1001    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1002    /// batch.min().all_ticks()
1003    /// # }, |mut stream| async move {
1004    /// // 1
1005    /// # assert_eq!(stream.next().await.unwrap(), 1);
1006    /// # }));
1007    /// ```
1008    pub fn min(self) -> Optional<T, L, B>
1009    where
1010        T: Ord,
1011    {
1012        self.reduce_commutative_idempotent(q!(|curr, new| {
1013            if new < *curr {
1014                *curr = new;
1015            }
1016        }))
1017    }
1018}
1019
1020impl<'a, T, L, B: Boundedness, O> Stream<T, L, B, O, ExactlyOnce>
1021where
1022    L: Location<'a>,
1023{
1024    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1025    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1026    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1027    ///
1028    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1029    ///
1030    /// # Example
1031    /// ```rust
1032    /// # use hydro_lang::*;
1033    /// # use futures::StreamExt;
1034    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1035    /// let tick = process.tick();
1036    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1037    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1038    /// batch
1039    ///     .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1040    ///     .all_ticks()
1041    /// # }, |mut stream| async move {
1042    /// // 10
1043    /// # assert_eq!(stream.next().await.unwrap(), 10);
1044    /// # }));
1045    /// ```
1046    pub fn fold_commutative<A, I, F>(
1047        self,
1048        init: impl IntoQuotedMut<'a, I, L>,
1049        comb: impl IntoQuotedMut<'a, F, L>,
1050    ) -> Singleton<A, L, B>
1051    where
1052        I: Fn() -> A + 'a,
1053        F: Fn(&mut A, T),
1054    {
1055        let nondet = nondet!(/** the combinator function is commutative */);
1056        self.assume_ordering(nondet).fold(init, comb)
1057    }
1058
1059    /// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream,
1060    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1061    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1062    /// reference, so that it can be modified in place.
1063    ///
1064    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1065    ///
1066    /// # Example
1067    /// ```rust
1068    /// # use hydro_lang::*;
1069    /// # use futures::StreamExt;
1070    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1071    /// let tick = process.tick();
1072    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1073    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1074    /// batch
1075    ///     .reduce_commutative(q!(|curr, new| *curr += new))
1076    ///     .all_ticks()
1077    /// # }, |mut stream| async move {
1078    /// // 10
1079    /// # assert_eq!(stream.next().await.unwrap(), 10);
1080    /// # }));
1081    /// ```
1082    pub fn reduce_commutative<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1083    where
1084        F: Fn(&mut T, T) + 'a,
1085    {
1086        let nondet = nondet!(/** the combinator function is commutative */);
1087        self.assume_ordering(nondet).reduce(comb)
1088    }
1089
1090    /// Computes the number of elements in the stream as a [`Singleton`].
1091    ///
1092    /// # Example
1093    /// ```rust
1094    /// # use hydro_lang::*;
1095    /// # use futures::StreamExt;
1096    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1097    /// let tick = process.tick();
1098    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1099    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1100    /// batch.count().all_ticks()
1101    /// # }, |mut stream| async move {
1102    /// // 4
1103    /// # assert_eq!(stream.next().await.unwrap(), 4);
1104    /// # }));
1105    /// ```
1106    pub fn count(self) -> Singleton<usize, L, B> {
1107        self.fold_commutative(q!(|| 0usize), q!(|count, _| *count += 1))
1108    }
1109}
1110
1111impl<'a, T, L, B: Boundedness, R> Stream<T, L, B, TotalOrder, R>
1112where
1113    L: Location<'a>,
1114{
1115    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1116    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1117    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1118    ///
1119    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1120    ///
1121    /// # Example
1122    /// ```rust
1123    /// # use hydro_lang::*;
1124    /// # use futures::StreamExt;
1125    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1126    /// let tick = process.tick();
1127    /// let bools = process.source_iter(q!(vec![false, true, false]));
1128    /// let batch = bools.batch(&tick, nondet!(/** test */));
1129    /// batch
1130    ///     .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1131    ///     .all_ticks()
1132    /// # }, |mut stream| async move {
1133    /// // true
1134    /// # assert_eq!(stream.next().await.unwrap(), true);
1135    /// # }));
1136    /// ```
1137    pub fn fold_idempotent<A, I, F>(
1138        self,
1139        init: impl IntoQuotedMut<'a, I, L>,
1140        comb: impl IntoQuotedMut<'a, F, L>,
1141    ) -> Singleton<A, L, B>
1142    where
1143        I: Fn() -> A + 'a,
1144        F: Fn(&mut A, T),
1145    {
1146        let nondet = nondet!(/** the combinator function is idempotent */);
1147        self.assume_retries(nondet).fold(init, comb)
1148    }
1149
1150    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1151    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1152    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1153    /// reference, so that it can be modified in place.
1154    ///
1155    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1156    ///
1157    /// # Example
1158    /// ```rust
1159    /// # use hydro_lang::*;
1160    /// # use futures::StreamExt;
1161    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1162    /// let tick = process.tick();
1163    /// let bools = process.source_iter(q!(vec![false, true, false]));
1164    /// let batch = bools.batch(&tick, nondet!(/** test */));
1165    /// batch.reduce_idempotent(q!(|acc, x| *acc |= x)).all_ticks()
1166    /// # }, |mut stream| async move {
1167    /// // true
1168    /// # assert_eq!(stream.next().await.unwrap(), true);
1169    /// # }));
1170    /// ```
1171    pub fn reduce_idempotent<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1172    where
1173        F: Fn(&mut T, T) + 'a,
1174    {
1175        let nondet = nondet!(/** the combinator function is idempotent */);
1176        self.assume_retries(nondet).reduce(comb)
1177    }
1178
1179    /// Computes the first element in the stream as an [`Optional`], which
1180    /// will be empty until the first element in the input arrives.
1181    ///
1182    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1183    /// re-ordering of elements may cause the first element to change.
1184    ///
1185    /// # Example
1186    /// ```rust
1187    /// # use hydro_lang::*;
1188    /// # use futures::StreamExt;
1189    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1190    /// let tick = process.tick();
1191    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1192    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1193    /// batch.first().all_ticks()
1194    /// # }, |mut stream| async move {
1195    /// // 1
1196    /// # assert_eq!(stream.next().await.unwrap(), 1);
1197    /// # }));
1198    /// ```
1199    pub fn first(self) -> Optional<T, L, B> {
1200        self.reduce_idempotent(q!(|_, _| {}))
1201    }
1202
1203    /// Computes the last element in the stream as an [`Optional`], which
1204    /// will be empty until an element in the input arrives.
1205    ///
1206    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1207    /// re-ordering of elements may cause the last element to change.
1208    ///
1209    /// # Example
1210    /// ```rust
1211    /// # use hydro_lang::*;
1212    /// # use futures::StreamExt;
1213    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1214    /// let tick = process.tick();
1215    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1216    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1217    /// batch.last().all_ticks()
1218    /// # }, |mut stream| async move {
1219    /// // 4
1220    /// # assert_eq!(stream.next().await.unwrap(), 4);
1221    /// # }));
1222    /// ```
1223    pub fn last(self) -> Optional<T, L, B> {
1224        self.reduce_idempotent(q!(|curr, new| *curr = new))
1225    }
1226}
1227
1228impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
1229where
1230    L: Location<'a>,
1231{
1232    /// Returns a stream with the current count tupled with each element in the input stream.
1233    ///
1234    /// # Example
1235    /// ```rust
1236    /// # use hydro_lang::{*, stream::ExactlyOnce};
1237    /// # use futures::StreamExt;
1238    /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, TotalOrder, ExactlyOnce>(|process| {
1239    /// let tick = process.tick();
1240    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1241    /// numbers.enumerate()
1242    /// # }, |mut stream| async move {
1243    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1244    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1245    /// #     assert_eq!(stream.next().await.unwrap(), w);
1246    /// # }
1247    /// # }));
1248    /// ```
1249    pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder, ExactlyOnce> {
1250        if L::is_top_level() {
1251            Stream::new(
1252                self.location.clone(),
1253                HydroNode::Persist {
1254                    inner: Box::new(HydroNode::Enumerate {
1255                        is_static: true,
1256                        input: Box::new(HydroNode::Unpersist {
1257                            inner: Box::new(self.ir_node.into_inner()),
1258                            metadata: self.location.new_node_metadata::<T>(),
1259                        }),
1260                        metadata: self.location.new_node_metadata::<(usize, T)>(),
1261                    }),
1262                    metadata: self.location.new_node_metadata::<(usize, T)>(),
1263                },
1264            )
1265        } else {
1266            Stream::new(
1267                self.location.clone(),
1268                HydroNode::Enumerate {
1269                    is_static: false,
1270                    input: Box::new(self.ir_node.into_inner()),
1271                    metadata: self.location.new_node_metadata::<(usize, T)>(),
1272                },
1273            )
1274        }
1275    }
1276
1277    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1278    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1279    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1280    ///
1281    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1282    /// to depend on the order of elements in the stream.
1283    ///
1284    /// # Example
1285    /// ```rust
1286    /// # use hydro_lang::*;
1287    /// # use futures::StreamExt;
1288    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1289    /// let tick = process.tick();
1290    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1291    /// let batch = words.batch(&tick, nondet!(/** test */));
1292    /// batch
1293    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1294    ///     .all_ticks()
1295    /// # }, |mut stream| async move {
1296    /// // "HELLOWORLD"
1297    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1298    /// # }));
1299    /// ```
1300    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
1301        self,
1302        init: impl IntoQuotedMut<'a, I, L>,
1303        comb: impl IntoQuotedMut<'a, F, L>,
1304    ) -> Singleton<A, L, B> {
1305        let init = init.splice_fn0_ctx(&self.location).into();
1306        let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1307
1308        let mut core = HydroNode::Fold {
1309            init,
1310            acc: comb,
1311            input: Box::new(self.ir_node.into_inner()),
1312            metadata: self.location.new_node_metadata::<A>(),
1313        };
1314
1315        if L::is_top_level() {
1316            // top-level (possibly unbounded) singletons are represented as
1317            // a stream which produces all values from all ticks every tick,
1318            // so Unpersist will always give the lastest aggregation
1319            core = HydroNode::Persist {
1320                inner: Box::new(core),
1321                metadata: self.location.new_node_metadata::<A>(),
1322            };
1323        }
1324
1325        Singleton::new(self.location, core)
1326    }
1327
1328    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B> {
1329        self.fold(
1330            q!(|| vec![]),
1331            q!(|acc, v| {
1332                acc.push(v);
1333            }),
1334        )
1335    }
1336
1337    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1338    /// and emitting each intermediate result.
1339    ///
1340    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1341    /// containing all intermediate accumulated values. The scan operation can also terminate early
1342    /// by returning `None`.
1343    ///
1344    /// The function takes a mutable reference to the accumulator and the current element, and returns
1345    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1346    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1347    ///
1348    /// # Examples
1349    ///
1350    /// Basic usage - running sum:
1351    /// ```rust
1352    /// # use hydro_lang::*;
1353    /// # use futures::StreamExt;
1354    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1355    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1356    ///     q!(|| 0),
1357    ///     q!(|acc, x| {
1358    ///         *acc += x;
1359    ///         Some(*acc)
1360    ///     }),
1361    /// )
1362    /// # }, |mut stream| async move {
1363    /// // Output: 1, 3, 6, 10
1364    /// # for w in vec![1, 3, 6, 10] {
1365    /// #     assert_eq!(stream.next().await.unwrap(), w);
1366    /// # }
1367    /// # }));
1368    /// ```
1369    ///
1370    /// Early termination example:
1371    /// ```rust
1372    /// # use hydro_lang::*;
1373    /// # use futures::StreamExt;
1374    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1375    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1376    ///     q!(|| 1),
1377    ///     q!(|state, x| {
1378    ///         *state = *state * x;
1379    ///         if *state > 6 {
1380    ///             None // Terminate the stream
1381    ///         } else {
1382    ///             Some(-*state)
1383    ///         }
1384    ///     }),
1385    /// )
1386    /// # }, |mut stream| async move {
1387    /// // Output: -1, -2, -6
1388    /// # for w in vec![-1, -2, -6] {
1389    /// #     assert_eq!(stream.next().await.unwrap(), w);
1390    /// # }
1391    /// # }));
1392    /// ```
1393    pub fn scan<A, U, I, F>(
1394        self,
1395        init: impl IntoQuotedMut<'a, I, L>,
1396        f: impl IntoQuotedMut<'a, F, L>,
1397    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1398    where
1399        I: Fn() -> A + 'a,
1400        F: Fn(&mut A, T) -> Option<U> + 'a,
1401    {
1402        let init = init.splice_fn0_ctx(&self.location).into();
1403        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1404
1405        if L::is_top_level() {
1406            Stream::new(
1407                self.location.clone(),
1408                HydroNode::Persist {
1409                    inner: Box::new(HydroNode::Scan {
1410                        init,
1411                        acc: f,
1412                        input: Box::new(HydroNode::Unpersist {
1413                            inner: Box::new(self.ir_node.into_inner()),
1414                            metadata: self.location.new_node_metadata::<U>(),
1415                        }),
1416                        metadata: self.location.new_node_metadata::<U>(),
1417                    }),
1418                    metadata: self.location.new_node_metadata::<U>(),
1419                },
1420            )
1421        } else {
1422            Stream::new(
1423                self.location.clone(),
1424                HydroNode::Scan {
1425                    init,
1426                    acc: f,
1427                    input: Box::new(self.ir_node.into_inner()),
1428                    metadata: self.location.new_node_metadata::<U>(),
1429                },
1430            )
1431        }
1432    }
1433
1434    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1435    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1436    /// until the first element in the input arrives.
1437    ///
1438    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1439    /// to depend on the order of elements in the stream.
1440    ///
1441    /// # Example
1442    /// ```rust
1443    /// # use hydro_lang::*;
1444    /// # use futures::StreamExt;
1445    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1446    /// let tick = process.tick();
1447    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1448    /// let batch = words.batch(&tick, nondet!(/** test */));
1449    /// batch
1450    ///     .map(q!(|x| x.to_string()))
1451    ///     .reduce(q!(|curr, new| curr.push_str(&new)))
1452    ///     .all_ticks()
1453    /// # }, |mut stream| async move {
1454    /// // "HELLOWORLD"
1455    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1456    /// # }));
1457    /// ```
1458    pub fn reduce<F: Fn(&mut T, T) + 'a>(
1459        self,
1460        comb: impl IntoQuotedMut<'a, F, L>,
1461    ) -> Optional<T, L, B> {
1462        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1463        let mut core = HydroNode::Reduce {
1464            f,
1465            input: Box::new(self.ir_node.into_inner()),
1466            metadata: self.location.new_node_metadata::<T>(),
1467        };
1468
1469        if L::is_top_level() {
1470            core = HydroNode::Persist {
1471                inner: Box::new(core),
1472                metadata: self.location.new_node_metadata::<T>(),
1473            };
1474        }
1475
1476        Optional::new(self.location, core)
1477    }
1478}
1479
1480impl<'a, T, L: Location<'a> + NoTick + NoAtomic, O, R> Stream<T, L, Unbounded, O, R> {
1481    /// Produces a new stream that interleaves the elements of the two input streams.
1482    /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1483    ///
1484    /// Currently, both input streams must be [`Unbounded`]. When the streams are
1485    /// [`Bounded`], you can use [`Stream::chain`] instead.
1486    ///
1487    /// # Example
1488    /// ```rust
1489    /// # use hydro_lang::*;
1490    /// # use futures::StreamExt;
1491    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1492    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1493    /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1494    /// # }, |mut stream| async move {
1495    /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1496    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1497    /// #     assert_eq!(stream.next().await.unwrap(), w);
1498    /// # }
1499    /// # }));
1500    /// ```
1501    pub fn interleave<O2, R2: MinRetries<R>>(
1502        self,
1503        other: Stream<T, L, Unbounded, O2, R2>,
1504    ) -> Stream<T, L, Unbounded, NoOrder, R::Min>
1505    where
1506        R: MinRetries<R2, Min = R2::Min>,
1507    {
1508        let tick = self.location.tick();
1509        // Because the outputs are unordered, we can interleave batches from both streams.
1510        let nondet_batch_interleaving = nondet!(/** output stream is NoOrder, can interleave */);
1511        self.batch(&tick, nondet_batch_interleaving)
1512            .weakest_ordering()
1513            .weaken_retries::<R2>()
1514            .chain(
1515                other
1516                    .batch(&tick, nondet_batch_interleaving)
1517                    .weakest_ordering()
1518                    .weaken_retries::<R>(),
1519            )
1520            .all_ticks()
1521    }
1522}
1523
1524impl<'a, T, L, O, R> Stream<T, L, Bounded, O, R>
1525where
1526    L: Location<'a>,
1527{
1528    /// Produces a new stream that emits the input elements in sorted order.
1529    ///
1530    /// The input stream can have any ordering guarantee, but the output stream
1531    /// will have a [`TotalOrder`] guarantee. This operator will block until all
1532    /// elements in the input stream are available, so it requires the input stream
1533    /// to be [`Bounded`].
1534    ///
1535    /// # Example
1536    /// ```rust
1537    /// # use hydro_lang::*;
1538    /// # use futures::StreamExt;
1539    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1540    /// let tick = process.tick();
1541    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1542    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1543    /// batch.sort().all_ticks()
1544    /// # }, |mut stream| async move {
1545    /// // 1, 2, 3, 4
1546    /// # for w in (1..5) {
1547    /// #     assert_eq!(stream.next().await.unwrap(), w);
1548    /// # }
1549    /// # }));
1550    /// ```
1551    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1552    where
1553        T: Ord,
1554    {
1555        Stream::new(
1556            self.location.clone(),
1557            HydroNode::Sort {
1558                input: Box::new(self.ir_node.into_inner()),
1559                metadata: self.location.new_node_metadata::<T>(),
1560            },
1561        )
1562    }
1563
1564    /// Produces a new stream that first emits the elements of the `self` stream,
1565    /// and then emits the elements of the `other` stream. The output stream has
1566    /// a [`TotalOrder`] guarantee if and only if both input streams have a
1567    /// [`TotalOrder`] guarantee.
1568    ///
1569    /// Currently, both input streams must be [`Bounded`]. This operator will block
1570    /// on the first stream until all its elements are available. In a future version,
1571    /// we will relax the requirement on the `other` stream.
1572    ///
1573    /// # Example
1574    /// ```rust
1575    /// # use hydro_lang::*;
1576    /// # use futures::StreamExt;
1577    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1578    /// let tick = process.tick();
1579    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1580    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1581    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1582    /// # }, |mut stream| async move {
1583    /// // 2, 3, 4, 5, 1, 2, 3, 4
1584    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1585    /// #     assert_eq!(stream.next().await.unwrap(), w);
1586    /// # }
1587    /// # }));
1588    /// ```
1589    pub fn chain<O2>(self, other: Stream<T, L, Bounded, O2, R>) -> Stream<T, L, Bounded, O::Min, R>
1590    where
1591        O: MinOrder<O2>,
1592    {
1593        check_matching_location(&self.location, &other.location);
1594
1595        Stream::new(
1596            self.location.clone(),
1597            HydroNode::Chain {
1598                first: Box::new(self.ir_node.into_inner()),
1599                second: Box::new(other.ir_node.into_inner()),
1600                metadata: self.location.new_node_metadata::<T>(),
1601            },
1602        )
1603    }
1604
1605    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
1606    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
1607    /// because this is compiled into a nested loop.
1608    pub fn cross_product_nested_loop<T2, O2>(
1609        self,
1610        other: Stream<T2, L, Bounded, O2, R>,
1611    ) -> Stream<(T, T2), L, Bounded, O::Min, R>
1612    where
1613        T: Clone,
1614        T2: Clone,
1615        O: MinOrder<O2>,
1616    {
1617        check_matching_location(&self.location, &other.location);
1618
1619        Stream::new(
1620            self.location.clone(),
1621            HydroNode::CrossProduct {
1622                left: Box::new(self.ir_node.into_inner()),
1623                right: Box::new(other.ir_node.into_inner()),
1624                metadata: self.location.new_node_metadata::<(T, T2)>(),
1625            },
1626        )
1627    }
1628}
1629
1630impl<'a, K, V1, L, B: Boundedness, O, R> Stream<(K, V1), L, B, O, R>
1631where
1632    L: Location<'a>,
1633{
1634    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
1635    /// by equi-joining the two streams on the key attribute `K`.
1636    ///
1637    /// # Example
1638    /// ```rust
1639    /// # use hydro_lang::*;
1640    /// # use std::collections::HashSet;
1641    /// # use futures::StreamExt;
1642    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1643    /// let tick = process.tick();
1644    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
1645    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
1646    /// stream1.join(stream2)
1647    /// # }, |mut stream| async move {
1648    /// // (1, ('a', 'x')), (2, ('b', 'y'))
1649    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
1650    /// # stream.map(|i| assert!(expected.contains(&i)));
1651    /// # }));
1652    pub fn join<V2, O2>(
1653        self,
1654        n: Stream<(K, V2), L, B, O2, R>,
1655    ) -> Stream<(K, (V1, V2)), L, B, NoOrder, R>
1656    where
1657        K: Eq + Hash,
1658    {
1659        check_matching_location(&self.location, &n.location);
1660
1661        Stream::new(
1662            self.location.clone(),
1663            HydroNode::Join {
1664                left: Box::new(self.ir_node.into_inner()),
1665                right: Box::new(n.ir_node.into_inner()),
1666                metadata: self.location.new_node_metadata::<(K, (V1, V2))>(),
1667            },
1668        )
1669    }
1670
1671    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
1672    /// computes the anti-join of the items in the input -- i.e. returns
1673    /// unique items in the first input that do not have a matching key
1674    /// in the second input.
1675    ///
1676    /// # Example
1677    /// ```rust
1678    /// # use hydro_lang::*;
1679    /// # use futures::StreamExt;
1680    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1681    /// let tick = process.tick();
1682    /// let stream = process
1683    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1684    ///   .batch(&tick, nondet!(/** test */));
1685    /// let batch = process
1686    ///   .source_iter(q!(vec![1, 2]))
1687    ///   .batch(&tick, nondet!(/** test */));
1688    /// stream.anti_join(batch).all_ticks()
1689    /// # }, |mut stream| async move {
1690    /// # for w in vec![(3, 'c'), (4, 'd')] {
1691    /// #     assert_eq!(stream.next().await.unwrap(), w);
1692    /// # }
1693    /// # }));
1694    pub fn anti_join<O2, R2>(self, n: Stream<K, L, Bounded, O2, R2>) -> Stream<(K, V1), L, B, O, R>
1695    where
1696        K: Eq + Hash,
1697    {
1698        check_matching_location(&self.location, &n.location);
1699
1700        Stream::new(
1701            self.location.clone(),
1702            HydroNode::AntiJoin {
1703                pos: Box::new(self.ir_node.into_inner()),
1704                neg: Box::new(n.ir_node.into_inner()),
1705                metadata: self.location.new_node_metadata::<(K, V1)>(),
1706            },
1707        )
1708    }
1709}
1710
1711impl<'a, K, V, L: Location<'a>, B: Boundedness, O, R> Stream<(K, V), L, B, O, R> {
1712    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
1713        KeyedStream {
1714            underlying: self.weakest_ordering(),
1715            _phantom_order: Default::default(),
1716        }
1717    }
1718}
1719
1720impl<'a, K, V, L, B: Boundedness> Stream<(K, V), L, B, TotalOrder, ExactlyOnce>
1721where
1722    K: Eq + Hash,
1723    L: Location<'a>,
1724{
1725    /// A special case of [`Stream::scan`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1726    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1727    /// in the second element are transformed via the `f` combinator.
1728    ///
1729    /// Unlike [`Stream::fold_keyed`] which only returns the final accumulated value, `scan` produces a new stream
1730    /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
1731    /// early by returning `None`.
1732    ///
1733    /// The function takes a mutable reference to the accumulator and the current element, and returns
1734    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1735    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1736    ///
1737    /// # Example
1738    /// ```rust
1739    /// # use hydro_lang::*;
1740    /// # use futures::StreamExt;
1741    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1742    /// process
1743    ///     .source_iter(q!(vec![(0, 1), (0, 2), (1, 3), (1, 4)]))
1744    ///     .scan_keyed(
1745    ///         q!(|| 0),
1746    ///         q!(|acc, x| {
1747    ///             *acc += x;
1748    ///             Some(*acc)
1749    ///         }),
1750    ///     )
1751    /// # }, |mut stream| async move {
1752    /// // Output: (0, 1), (0, 3), (1, 3), (1, 7)
1753    /// # for w in vec![(0, 1), (0, 3), (1, 3), (1, 7)] {
1754    /// #     assert_eq!(stream.next().await.unwrap(), w);
1755    /// # }
1756    /// # }));
1757    /// ```
1758    pub fn scan_keyed<A, U, I, F>(
1759        self,
1760        init: impl IntoQuotedMut<'a, I, L> + Copy,
1761        f: impl IntoQuotedMut<'a, F, L> + Copy,
1762    ) -> Stream<(K, U), L, B, TotalOrder, ExactlyOnce>
1763    where
1764        K: Clone,
1765        I: Fn() -> A + 'a,
1766        F: Fn(&mut A, V) -> Option<U> + 'a,
1767    {
1768        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1769        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1770        self.scan(
1771            q!(|| HashMap::new()),
1772            q!(move |acc, (k, v)| {
1773                let existing_state = acc.entry(k.clone()).or_insert_with(&init);
1774                if let Some(out) = f(existing_state, v) {
1775                    Some(Some((k, out)))
1776                } else {
1777                    acc.remove(&k);
1778                    Some(None)
1779                }
1780            }),
1781        )
1782        .flatten_ordered()
1783    }
1784
1785    /// Like [`Stream::fold_keyed`], in the spirit of SQL's GROUP BY and aggregation constructs. But the aggregation
1786    /// function returns a boolean, which when true indicates that the aggregated result is complete and can be
1787    /// released to downstream computation. Unlike [`Stream::fold_keyed`], this means that even if the input stream
1788    /// is [`Unbounded`], the outputs of the fold can be processed like normal stream elements.
1789    ///
1790    /// # Example
1791    /// ```rust
1792    /// # use hydro_lang::*;
1793    /// # use futures::StreamExt;
1794    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1795    /// process
1796    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1797    ///     .fold_keyed_early_stop(
1798    ///         q!(|| 0),
1799    ///         q!(|acc, x| {
1800    ///             *acc += x;
1801    ///             x % 2 == 0
1802    ///         }),
1803    ///     )
1804    /// # }, |mut stream| async move {
1805    /// // Output: (0, 2), (1, 9)
1806    /// # for w in vec![(0, 2), (1, 9)] {
1807    /// #     assert_eq!(stream.next().await.unwrap(), w);
1808    /// # }
1809    /// # }));
1810    /// ```
1811    pub fn fold_keyed_early_stop<A, I, F>(
1812        self,
1813        init: impl IntoQuotedMut<'a, I, L> + Copy,
1814        f: impl IntoQuotedMut<'a, F, L> + Copy,
1815    ) -> Stream<(K, A), L, B, TotalOrder, ExactlyOnce>
1816    where
1817        K: Clone,
1818        I: Fn() -> A + 'a,
1819        F: Fn(&mut A, V) -> bool + 'a,
1820    {
1821        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1822        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1823        self.scan(
1824            q!(|| HashMap::new()),
1825            q!(move |acc, (k, v)| {
1826                let existing_state = acc.entry(k.clone()).or_insert_with(&init);
1827                if f(existing_state, v) {
1828                    let out = acc.remove(&k).unwrap();
1829                    Some(Some((k, out)))
1830                } else {
1831                    Some(None)
1832                }
1833            }),
1834        )
1835        .flatten_ordered()
1836    }
1837}
1838
1839impl<'a, K, V, L> Stream<(K, V), Tick<L>, Bounded, TotalOrder, ExactlyOnce>
1840where
1841    K: Eq + Hash,
1842    L: Location<'a>,
1843{
1844    #[deprecated = "use .into_keyed().fold(...) instead"]
1845    /// A special case of [`Stream::fold`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1846    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1847    /// in the second element are accumulated via the `comb` closure.
1848    ///
1849    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1850    /// to depend on the order of elements in the stream.
1851    ///
1852    /// If the input and output value types are the same and do not require initialization then use
1853    /// [`Stream::reduce_keyed`].
1854    ///
1855    /// # Example
1856    /// ```rust
1857    /// # use hydro_lang::*;
1858    /// # use futures::StreamExt;
1859    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1860    /// let tick = process.tick();
1861    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1862    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1863    /// batch
1864    ///     .fold_keyed(q!(|| 0), q!(|acc, x| *acc += x))
1865    ///     .all_ticks()
1866    /// # }, |mut stream| async move {
1867    /// // (1, 5), (2, 7)
1868    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1869    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1870    /// # }));
1871    /// ```
1872    pub fn fold_keyed<A, I, F>(
1873        self,
1874        init: impl IntoQuotedMut<'a, I, Tick<L>>,
1875        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1876    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
1877    where
1878        I: Fn() -> A + 'a,
1879        F: Fn(&mut A, V) + 'a,
1880    {
1881        self.into_keyed().fold(init, comb).entries()
1882    }
1883
1884    #[deprecated = "use .into_keyed().reduce(...) instead"]
1885    /// A special case of [`Stream::reduce`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1886    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1887    /// in the second element are accumulated via the `comb` closure.
1888    ///
1889    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1890    /// to depend on the order of elements in the stream.
1891    ///
1892    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed`].
1893    ///
1894    /// # Example
1895    /// ```rust
1896    /// # use hydro_lang::*;
1897    /// # use futures::StreamExt;
1898    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1899    /// let tick = process.tick();
1900    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1901    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1902    /// batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks()
1903    /// # }, |mut stream| async move {
1904    /// // (1, 5), (2, 7)
1905    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1906    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1907    /// # }));
1908    /// ```
1909    pub fn reduce_keyed<F>(
1910        self,
1911        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1912    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
1913    where
1914        F: Fn(&mut V, V) + 'a,
1915    {
1916        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1917
1918        Stream::new(
1919            self.location.clone(),
1920            HydroNode::ReduceKeyed {
1921                f,
1922                input: Box::new(self.ir_node.into_inner()),
1923                metadata: self.location.new_node_metadata::<(K, V)>(),
1924            },
1925        )
1926    }
1927}
1928
1929impl<'a, K, V, L, O, R> Stream<(K, V), Tick<L>, Bounded, O, R>
1930where
1931    K: Eq + Hash,
1932    L: Location<'a>,
1933{
1934    #[deprecated = "use .into_keyed().fold_commutative_idempotent(...) instead"]
1935    /// A special case of [`Stream::fold_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
1936    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
1937    /// in the second element are accumulated via the `comb` closure.
1938    ///
1939    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1940    /// as there may be non-deterministic duplicates.
1941    ///
1942    /// If the input and output value types are the same and do not require initialization then use
1943    /// [`Stream::reduce_keyed_commutative_idempotent`].
1944    ///
1945    /// # Example
1946    /// ```rust
1947    /// # use hydro_lang::*;
1948    /// # use futures::StreamExt;
1949    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1950    /// let tick = process.tick();
1951    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
1952    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1953    /// batch
1954    ///     .fold_keyed_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1955    ///     .all_ticks()
1956    /// # }, |mut stream| async move {
1957    /// // (1, false), (2, true)
1958    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1959    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1960    /// # }));
1961    /// ```
1962    pub fn fold_keyed_commutative_idempotent<A, I, F>(
1963        self,
1964        init: impl IntoQuotedMut<'a, I, Tick<L>>,
1965        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1966    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
1967    where
1968        I: Fn() -> A + 'a,
1969        F: Fn(&mut A, V) + 'a,
1970    {
1971        self.into_keyed()
1972            .fold_commutative_idempotent(init, comb)
1973            .entries()
1974    }
1975
1976    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
1977    /// # Example
1978    /// ```rust
1979    /// # use hydro_lang::*;
1980    /// # use futures::StreamExt;
1981    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1982    /// let tick = process.tick();
1983    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1984    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1985    /// batch.keys().all_ticks()
1986    /// # }, |mut stream| async move {
1987    /// // 1, 2
1988    /// # assert_eq!(stream.next().await.unwrap(), 1);
1989    /// # assert_eq!(stream.next().await.unwrap(), 2);
1990    /// # }));
1991    /// ```
1992    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
1993        self.into_keyed()
1994            .fold_commutative_idempotent(q!(|| ()), q!(|_, _| {}))
1995            .keys()
1996    }
1997
1998    #[deprecated = "use .into_keyed().reduce_commutative_idempotent(...) instead"]
1999    /// A special case of [`Stream::reduce_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2000    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2001    /// in the second element are accumulated via the `comb` closure.
2002    ///
2003    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2004    /// as there may be non-deterministic duplicates.
2005    ///
2006    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative_idempotent`].
2007    ///
2008    /// # Example
2009    /// ```rust
2010    /// # use hydro_lang::*;
2011    /// # use futures::StreamExt;
2012    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
2013    /// let tick = process.tick();
2014    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2015    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2016    /// batch
2017    ///     .reduce_keyed_commutative_idempotent(q!(|acc, x| *acc |= x))
2018    ///     .all_ticks()
2019    /// # }, |mut stream| async move {
2020    /// // (1, false), (2, true)
2021    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2022    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2023    /// # }));
2024    /// ```
2025    pub fn reduce_keyed_commutative_idempotent<F>(
2026        self,
2027        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2028    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2029    where
2030        F: Fn(&mut V, V) + 'a,
2031    {
2032        self.into_keyed()
2033            .reduce_commutative_idempotent(comb)
2034            .entries()
2035    }
2036}
2037
2038impl<'a, K, V, L, O> Stream<(K, V), Tick<L>, Bounded, O, ExactlyOnce>
2039where
2040    K: Eq + Hash,
2041    L: Location<'a>,
2042{
2043    #[deprecated = "use .into_keyed().fold_commutative(...) instead"]
2044    /// A special case of [`Stream::fold_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2045    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2046    /// in the second element are accumulated via the `comb` closure.
2047    ///
2048    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2049    ///
2050    /// If the input and output value types are the same and do not require initialization then use
2051    /// [`Stream::reduce_keyed_commutative`].
2052    ///
2053    /// # Example
2054    /// ```rust
2055    /// # use hydro_lang::*;
2056    /// # use futures::StreamExt;
2057    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
2058    /// let tick = process.tick();
2059    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2060    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2061    /// batch
2062    ///     .fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x))
2063    ///     .all_ticks()
2064    /// # }, |mut stream| async move {
2065    /// // (1, 5), (2, 7)
2066    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2067    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2068    /// # }));
2069    /// ```
2070    pub fn fold_keyed_commutative<A, I, F>(
2071        self,
2072        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2073        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2074    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2075    where
2076        I: Fn() -> A + 'a,
2077        F: Fn(&mut A, V) + 'a,
2078    {
2079        self.into_keyed().fold_commutative(init, comb).entries()
2080    }
2081
2082    #[deprecated = "use .into_keyed().reduce_commutative(...) instead"]
2083    /// A special case of [`Stream::reduce_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2084    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2085    /// in the second element are accumulated via the `comb` closure.
2086    ///
2087    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2088    ///
2089    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative`].
2090    ///
2091    /// # Example
2092    /// ```rust
2093    /// # use hydro_lang::*;
2094    /// # use futures::StreamExt;
2095    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
2096    /// let tick = process.tick();
2097    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2098    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2099    /// batch
2100    ///     .reduce_keyed_commutative(q!(|acc, x| *acc += x))
2101    ///     .all_ticks()
2102    /// # }, |mut stream| async move {
2103    /// // (1, 5), (2, 7)
2104    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2105    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2106    /// # }));
2107    /// ```
2108    pub fn reduce_keyed_commutative<F>(
2109        self,
2110        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2111    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2112    where
2113        F: Fn(&mut V, V) + 'a,
2114    {
2115        self.into_keyed().reduce_commutative(comb).entries()
2116    }
2117}
2118
2119impl<'a, K, V, L, R> Stream<(K, V), Tick<L>, Bounded, TotalOrder, R>
2120where
2121    K: Eq + Hash,
2122    L: Location<'a>,
2123{
2124    #[deprecated = "use .into_keyed().fold_idempotent(...) instead"]
2125    /// A special case of [`Stream::fold_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2126    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2127    /// in the second element are accumulated via the `comb` closure.
2128    ///
2129    /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
2130    ///
2131    /// If the input and output value types are the same and do not require initialization then use
2132    /// [`Stream::reduce_keyed_idempotent`].
2133    ///
2134    /// # Example
2135    /// ```rust
2136    /// # use hydro_lang::*;
2137    /// # use futures::StreamExt;
2138    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
2139    /// let tick = process.tick();
2140    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2141    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2142    /// batch
2143    ///     .fold_keyed_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2144    ///     .all_ticks()
2145    /// # }, |mut stream| async move {
2146    /// // (1, false), (2, true)
2147    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2148    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2149    /// # }));
2150    /// ```
2151    pub fn fold_keyed_idempotent<A, I, F>(
2152        self,
2153        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2154        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2155    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2156    where
2157        I: Fn() -> A + 'a,
2158        F: Fn(&mut A, V) + 'a,
2159    {
2160        self.into_keyed().fold_idempotent(init, comb).entries()
2161    }
2162
2163    #[deprecated = "use .into_keyed().reduce_idempotent(...) instead"]
2164    /// A special case of [`Stream::reduce_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2165    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2166    /// in the second element are accumulated via the `comb` closure.
2167    ///
2168    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
2169    ///
2170    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_idempotent`].
2171    ///
2172    /// # Example
2173    /// ```rust
2174    /// # use hydro_lang::*;
2175    /// # use futures::StreamExt;
2176    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
2177    /// let tick = process.tick();
2178    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2179    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2180    /// batch
2181    ///     .reduce_keyed_idempotent(q!(|acc, x| *acc |= x))
2182    ///     .all_ticks()
2183    /// # }, |mut stream| async move {
2184    /// // (1, false), (2, true)
2185    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2186    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2187    /// # }));
2188    /// ```
2189    pub fn reduce_keyed_idempotent<F>(
2190        self,
2191        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2192    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2193    where
2194        F: Fn(&mut V, V) + 'a,
2195    {
2196        self.into_keyed().reduce_idempotent(comb).entries()
2197    }
2198}
2199
2200impl<'a, T, L, B: Boundedness, O, R> Stream<T, Atomic<L>, B, O, R>
2201where
2202    L: Location<'a> + NoTick,
2203{
2204    /// Returns a stream corresponding to the latest batch of elements being atomically
2205    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2206    /// the order of the input.
2207    ///
2208    /// # Non-Determinism
2209    /// The batch boundaries are non-deterministic and may change across executions.
2210    pub fn batch(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2211        Stream::new(
2212            self.location.clone().tick,
2213            HydroNode::Unpersist {
2214                inner: Box::new(self.ir_node.into_inner()),
2215                metadata: self.location.new_node_metadata::<T>(),
2216            },
2217        )
2218    }
2219
2220    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2221        Stream::new(self.location.tick.l, self.ir_node.into_inner())
2222    }
2223
2224    pub fn atomic_source(&self) -> Tick<L> {
2225        self.location.tick.clone()
2226    }
2227}
2228
2229impl<'a, T, L, B: Boundedness, O, R> Stream<T, L, B, O, R>
2230where
2231    L: Location<'a> + NoTick + NoAtomic,
2232{
2233    pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
2234        Stream::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
2235    }
2236
2237    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2238    /// Future outputs are produced as available, regardless of input arrival order.
2239    ///
2240    /// # Example
2241    /// ```rust
2242    /// # use std::collections::HashSet;
2243    /// # use futures::StreamExt;
2244    /// # use hydro_lang::*;
2245    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
2246    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2247    ///     .map(q!(|x| async move {
2248    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2249    ///         x
2250    ///     }))
2251    ///     .resolve_futures()
2252    /// #   },
2253    /// #   |mut stream| async move {
2254    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2255    /// #       let mut output = HashSet::new();
2256    /// #       for _ in 1..10 {
2257    /// #           output.insert(stream.next().await.unwrap());
2258    /// #       }
2259    /// #       assert_eq!(
2260    /// #           output,
2261    /// #           HashSet::<i32>::from_iter(1..10)
2262    /// #       );
2263    /// #   },
2264    /// # ));
2265    pub fn resolve_futures<T2>(self) -> Stream<T2, L, B, NoOrder, R>
2266    where
2267        T: Future<Output = T2>,
2268    {
2269        Stream::new(
2270            self.location.clone(),
2271            HydroNode::ResolveFutures {
2272                input: Box::new(self.ir_node.into_inner()),
2273                metadata: self.location.new_node_metadata::<T2>(),
2274            },
2275        )
2276    }
2277
2278    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2279    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2280    /// the order of the input.
2281    ///
2282    /// # Non-Determinism
2283    /// The batch boundaries are non-deterministic and may change across executions.
2284    pub fn batch(self, tick: &Tick<L>, nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2285        self.atomic(tick).batch(nondet)
2286    }
2287
2288    /// Given a time interval, returns a stream corresponding to samples taken from the
2289    /// stream roughly at that interval. The output will have elements in the same order
2290    /// as the input, but with arbitrary elements skipped between samples. There is also
2291    /// no guarantee on the exact timing of the samples.
2292    ///
2293    /// # Non-Determinism
2294    /// The output stream is non-deterministic in which elements are sampled, since this
2295    /// is controlled by a clock.
2296    pub fn sample_every(
2297        self,
2298        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
2299        nondet: NonDet,
2300    ) -> Stream<T, L, Unbounded, O, AtLeastOnce> {
2301        let samples = self.location.source_interval(interval, nondet);
2302
2303        let tick = self.location.tick();
2304        self.batch(&tick, nondet)
2305            .continue_if(samples.batch(&tick, nondet).first())
2306            .all_ticks()
2307            .weakest_retries()
2308    }
2309
2310    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
2311    /// stream has not emitted a value since that duration.
2312    ///
2313    /// # Non-Determinism
2314    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
2315    /// samples take place, timeouts may be non-deterministically generated or missed,
2316    /// and the notification of the timeout may be delayed as well. There is also no
2317    /// guarantee on how long the [`Optional`] will have a value after the timeout is
2318    /// detected based on when the next sample is taken.
2319    pub fn timeout(
2320        self,
2321        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
2322        nondet: NonDet,
2323    ) -> Optional<(), L, Unbounded> {
2324        let tick = self.location.tick();
2325
2326        let latest_received = self.assume_retries(nondet).fold_commutative(
2327            q!(|| None),
2328            q!(|latest, _| {
2329                *latest = Some(Instant::now());
2330            }),
2331        );
2332
2333        latest_received
2334            .snapshot(&tick, nondet)
2335            .filter_map(q!(move |latest_received| {
2336                if let Some(latest_received) = latest_received {
2337                    if Instant::now().duration_since(latest_received) > duration {
2338                        Some(())
2339                    } else {
2340                        None
2341                    }
2342                } else {
2343                    Some(())
2344                }
2345            }))
2346            .latest()
2347    }
2348}
2349
2350impl<'a, F, T, L, B: Boundedness, O, R> Stream<F, L, B, O, R>
2351where
2352    L: Location<'a> + NoTick + NoAtomic,
2353    F: Future<Output = T>,
2354{
2355    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2356    /// Future outputs are produced in the same order as the input stream.
2357    ///
2358    /// # Example
2359    /// ```rust
2360    /// # use std::collections::HashSet;
2361    /// # use futures::StreamExt;
2362    /// # use hydro_lang::*;
2363    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
2364    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2365    ///     .map(q!(|x| async move {
2366    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2367    ///         x
2368    ///     }))
2369    ///     .resolve_futures_ordered()
2370    /// #   },
2371    /// #   |mut stream| async move {
2372    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2373    /// #       let mut output = Vec::new();
2374    /// #       for _ in 1..10 {
2375    /// #           output.push(stream.next().await.unwrap());
2376    /// #       }
2377    /// #       assert_eq!(
2378    /// #           output,
2379    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2380    /// #       );
2381    /// #   },
2382    /// # ));
2383    pub fn resolve_futures_ordered(self) -> Stream<T, L, B, O, R> {
2384        Stream::new(
2385            self.location.clone(),
2386            HydroNode::ResolveFuturesOrdered {
2387                input: Box::new(self.ir_node.into_inner()),
2388                metadata: self.location.new_node_metadata::<T>(),
2389            },
2390        )
2391    }
2392}
2393
2394impl<'a, T, L, B: Boundedness, O, R> Stream<T, L, B, O, R>
2395where
2396    L: Location<'a> + NoTick,
2397{
2398    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
2399        let f = f.splice_fn1_ctx(&self.location).into();
2400        let metadata = self.location.new_node_metadata::<T>();
2401        self.location
2402            .flow_state()
2403            .borrow_mut()
2404            .leaves
2405            .as_mut()
2406            .expect(FLOW_USED_MESSAGE)
2407            .push(HydroLeaf::ForEach {
2408                input: Box::new(HydroNode::Unpersist {
2409                    inner: Box::new(self.ir_node.into_inner()),
2410                    metadata: metadata.clone(),
2411                }),
2412                f,
2413                metadata,
2414            });
2415    }
2416
2417    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
2418    where
2419        S: 'a + futures::Sink<T> + Unpin,
2420    {
2421        self.location
2422            .flow_state()
2423            .borrow_mut()
2424            .leaves
2425            .as_mut()
2426            .expect(FLOW_USED_MESSAGE)
2427            .push(HydroLeaf::DestSink {
2428                sink: sink.splice_typed_ctx(&self.location).into(),
2429                input: Box::new(self.ir_node.into_inner()),
2430                metadata: self.location.new_node_metadata::<T>(),
2431            });
2432    }
2433}
2434
2435impl<'a, T, L, O, R> Stream<T, Tick<L>, Bounded, O, R>
2436where
2437    L: Location<'a>,
2438{
2439    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2440        Stream::new(
2441            self.location.outer().clone(),
2442            HydroNode::Persist {
2443                inner: Box::new(self.ir_node.into_inner()),
2444                metadata: self.location.new_node_metadata::<T>(),
2445            },
2446        )
2447    }
2448
2449    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2450        Stream::new(
2451            Atomic {
2452                tick: self.location.clone(),
2453            },
2454            HydroNode::Persist {
2455                inner: Box::new(self.ir_node.into_inner()),
2456                metadata: self.location.new_node_metadata::<T>(),
2457            },
2458        )
2459    }
2460
2461    pub fn persist(self) -> Stream<T, Tick<L>, Bounded, O, R>
2462    where
2463        T: Clone,
2464    {
2465        Stream::new(
2466            self.location.clone(),
2467            HydroNode::Persist {
2468                inner: Box::new(self.ir_node.into_inner()),
2469                metadata: self.location.new_node_metadata::<T>(),
2470            },
2471        )
2472    }
2473
2474    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2475        Stream::new(
2476            self.location.clone(),
2477            HydroNode::DeferTick {
2478                input: Box::new(self.ir_node.into_inner()),
2479                metadata: self.location.new_node_metadata::<T>(),
2480            },
2481        )
2482    }
2483
2484    pub fn delta(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2485        Stream::new(
2486            self.location.clone(),
2487            HydroNode::Delta {
2488                inner: Box::new(self.ir_node.into_inner()),
2489                metadata: self.location.new_node_metadata::<T>(),
2490            },
2491        )
2492    }
2493}
2494
2495#[cfg(test)]
2496mod tests {
2497    use futures::StreamExt;
2498    use hydro_deploy::Deployment;
2499    use serde::{Deserialize, Serialize};
2500    use stageleft::q;
2501
2502    use crate::FlowBuilder;
2503    use crate::location::Location;
2504
2505    struct P1 {}
2506    struct P2 {}
2507
2508    #[derive(Serialize, Deserialize, Debug)]
2509    struct SendOverNetwork {
2510        n: u32,
2511    }
2512
2513    #[tokio::test]
2514    async fn first_ten_distributed() {
2515        let mut deployment = Deployment::new();
2516
2517        let flow = FlowBuilder::new();
2518        let first_node = flow.process::<P1>();
2519        let second_node = flow.process::<P2>();
2520        let external = flow.external::<P2>();
2521
2522        let numbers = first_node.source_iter(q!(0..10));
2523        let out_port = numbers
2524            .map(q!(|n| SendOverNetwork { n }))
2525            .send_bincode(&second_node)
2526            .send_bincode_external(&external);
2527
2528        let nodes = flow
2529            .with_process(&first_node, deployment.Localhost())
2530            .with_process(&second_node, deployment.Localhost())
2531            .with_external(&external, deployment.Localhost())
2532            .deploy(&mut deployment);
2533
2534        deployment.deploy().await.unwrap();
2535
2536        let mut external_out = nodes.connect_source_bincode(out_port).await;
2537
2538        deployment.start().await.unwrap();
2539
2540        for i in 0..10 {
2541            assert_eq!(external_out.next().await.unwrap().n, i);
2542        }
2543    }
2544
2545    #[tokio::test]
2546    async fn first_cardinality() {
2547        let mut deployment = Deployment::new();
2548
2549        let flow = FlowBuilder::new();
2550        let node = flow.process::<()>();
2551        let external = flow.external::<()>();
2552
2553        let node_tick = node.tick();
2554        let count = node_tick
2555            .singleton(q!([1, 2, 3]))
2556            .into_stream()
2557            .flatten_ordered()
2558            .first()
2559            .into_stream()
2560            .count()
2561            .all_ticks()
2562            .send_bincode_external(&external);
2563
2564        let nodes = flow
2565            .with_process(&node, deployment.Localhost())
2566            .with_external(&external, deployment.Localhost())
2567            .deploy(&mut deployment);
2568
2569        deployment.deploy().await.unwrap();
2570
2571        let mut external_out = nodes.connect_source_bincode(count).await;
2572
2573        deployment.start().await.unwrap();
2574
2575        assert_eq!(external_out.next().await.unwrap(), 1);
2576    }
2577}