hydro_lang/
keyed_stream.rs

1use std::hash::Hash;
2use std::marker::PhantomData;
3
4use stageleft::{IntoQuotedMut, QuotedWithContext, q};
5
6use crate::boundedness::Boundedness;
7use crate::cycle::{CycleCollection, CycleComplete, ForwardRefMarker};
8use crate::ir::HydroNode;
9use crate::keyed_singleton::KeyedSingleton;
10use crate::location::tick::NoAtomic;
11use crate::location::{LocationId, NoTick, check_matching_location};
12use crate::manual_expr::ManualExpr;
13use crate::stream::{ExactlyOnce, MinOrder, MinRetries};
14use crate::unsafety::NonDet;
15use crate::*;
16
17/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`,
18/// where the order of keys is non-deterministic but the order *within* each group may
19/// be deterministic.
20///
21/// Type Parameters:
22/// - `K`: the type of the key for each group
23/// - `V`: the type of the elements inside each group
24/// - `Loc`: the [`Location`] where the keyed stream is materialized
25/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
26/// - `Order`: tracks whether the elements within each group have deterministic order
27///   ([`TotalOrder`]) or not ([`NoOrder`])
28/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
29///   ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::stream::AtLeastOnce`])
30pub struct KeyedStream<K, V, Loc, Bound: Boundedness, Order = TotalOrder, Retries = ExactlyOnce> {
31    pub(crate) underlying: Stream<(K, V), Loc, Bound, NoOrder, Retries>,
32    pub(crate) _phantom_order: PhantomData<Order>,
33}
34
35impl<'a, K, V, L, B: Boundedness, R> From<KeyedStream<K, V, L, B, TotalOrder, R>>
36    for KeyedStream<K, V, L, B, NoOrder, R>
37where
38    L: Location<'a>,
39{
40    fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
41        KeyedStream {
42            underlying: stream.underlying,
43            _phantom_order: Default::default(),
44        }
45    }
46}
47
48impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order, Retries> Clone
49    for KeyedStream<K, V, Loc, Bound, Order, Retries>
50{
51    fn clone(&self) -> Self {
52        KeyedStream {
53            underlying: self.underlying.clone(),
54            _phantom_order: PhantomData,
55        }
56    }
57}
58
59impl<'a, K, V, L, B: Boundedness, O, R> CycleCollection<'a, ForwardRefMarker>
60    for KeyedStream<K, V, L, B, O, R>
61where
62    L: Location<'a> + NoTick,
63{
64    type Location = L;
65
66    fn create_source(ident: syn::Ident, location: L) -> Self {
67        Stream::create_source(ident, location).into_keyed()
68    }
69}
70
71impl<'a, K, V, L, B: Boundedness, O, R> CycleComplete<'a, ForwardRefMarker>
72    for KeyedStream<K, V, L, B, O, R>
73where
74    L: Location<'a> + NoTick,
75{
76    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
77        self.underlying.complete(ident, expected_location);
78    }
79}
80
81impl<'a, K, V, L: Location<'a>, B: Boundedness, O, R> KeyedStream<K, V, L, B, O, R> {
82    /// Explicitly "casts" the keyed stream to a type with a different ordering
83    /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
84    /// by the type-system.
85    ///
86    /// # Non-Determinism
87    /// This function is used as an escape hatch, and any mistakes in the
88    /// provided ordering guarantee will propagate into the guarantees
89    /// for the rest of the program.
90    pub fn assume_ordering<O2>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
91        KeyedStream {
92            underlying: self.underlying,
93            _phantom_order: PhantomData,
94        }
95    }
96
97    /// Explicitly "casts" the keyed stream to a type with a different retries
98    /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
99    /// be proven by the type-system.
100    ///
101    /// # Non-Determinism
102    /// This function is used as an escape hatch, and any mistakes in the
103    /// provided retries guarantee will propagate into the guarantees
104    /// for the rest of the program.
105    pub fn assume_retries<R2>(self, nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
106        KeyedStream {
107            underlying: self.underlying.assume_retries::<R2>(nondet),
108            _phantom_order: PhantomData,
109        }
110    }
111
112    /// Flattens the keyed stream into a single stream of key-value pairs, with non-deterministic
113    /// element ordering.
114    ///
115    /// # Example
116    /// ```rust
117    /// # use hydro_lang::*;
118    /// # use futures::StreamExt;
119    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
120    /// process
121    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
122    ///     .into_keyed()
123    ///     .entries()
124    /// # }, |mut stream| async move {
125    /// // (1, 2), (1, 3), (2, 4) in any order
126    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
127    /// #     assert_eq!(stream.next().await.unwrap(), w);
128    /// # }
129    /// # }));
130    /// ```
131    pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
132        self.underlying
133    }
134
135    /// Flattens the keyed stream into a single stream of only the values, with non-deterministic
136    /// element ordering.
137    ///
138    /// # Example
139    /// ```rust
140    /// # use hydro_lang::*;
141    /// # use futures::StreamExt;
142    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
143    /// process
144    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
145    ///     .into_keyed()
146    ///     .values()
147    /// # }, |mut stream| async move {
148    /// // 2, 3, 4 in any order
149    /// # for w in vec![2, 3, 4] {
150    /// #     assert_eq!(stream.next().await.unwrap(), w);
151    /// # }
152    /// # }));
153    /// ```
154    pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
155        self.underlying.map(q!(|(_, v)| v))
156    }
157
158    /// Transforms each value by invoking `f` on each element, with keys staying the same
159    /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
160    ///
161    /// If you do not want to modify the stream and instead only want to view
162    /// each item use [`KeyedStream::inspect`] instead.
163    ///
164    /// # Example
165    /// ```rust
166    /// # use hydro_lang::*;
167    /// # use futures::StreamExt;
168    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
169    /// process
170    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
171    ///     .into_keyed()
172    ///     .map(q!(|v| v + 1))
173    /// #   .entries()
174    /// # }, |mut stream| async move {
175    /// // { 1: [3, 4], 2: [5] }
176    /// # for w in vec![(1, 3), (1, 4), (2, 5)] {
177    /// #     assert_eq!(stream.next().await.unwrap(), w);
178    /// # }
179    /// # }));
180    /// ```
181    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
182    where
183        F: Fn(V) -> U + 'a,
184    {
185        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
186        KeyedStream {
187            underlying: self.underlying.map(q!({
188                let orig = f;
189                move |(k, v)| (k, orig(v))
190            })),
191            _phantom_order: Default::default(),
192        }
193    }
194
195    /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
196    /// re-grouped even they are tuples; instead they will be grouped under the original key.
197    ///
198    /// If you do not want to modify the stream and instead only want to view
199    /// each item use [`KeyedStream::inspect_with_key`] instead.
200    ///
201    /// # Example
202    /// ```rust
203    /// # use hydro_lang::*;
204    /// # use futures::StreamExt;
205    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
206    /// process
207    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
208    ///     .into_keyed()
209    ///     .map_with_key(q!(|(k, v)| k + v))
210    /// #   .entries()
211    /// # }, |mut stream| async move {
212    /// // { 1: [3, 4], 2: [6] }
213    /// # for w in vec![(1, 3), (1, 4), (2, 6)] {
214    /// #     assert_eq!(stream.next().await.unwrap(), w);
215    /// # }
216    /// # }));
217    /// ```
218    pub fn map_with_key<U, F>(
219        self,
220        f: impl IntoQuotedMut<'a, F, L> + Copy,
221    ) -> KeyedStream<K, U, L, B, O, R>
222    where
223        F: Fn((K, V)) -> U + 'a,
224        K: Clone,
225    {
226        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
227        KeyedStream {
228            underlying: self.underlying.map(q!({
229                let orig = f;
230                move |(k, v)| {
231                    let out = orig((k.clone(), v));
232                    (k, out)
233                }
234            })),
235            _phantom_order: Default::default(),
236        }
237    }
238
239    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
240    /// `f`, preserving the order of the elements within the group.
241    ///
242    /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
243    /// not modify or take ownership of the values. If you need to modify the values while filtering
244    /// use [`KeyedStream::filter_map`] instead.
245    ///
246    /// # Example
247    /// ```rust
248    /// # use hydro_lang::*;
249    /// # use futures::StreamExt;
250    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
251    /// process
252    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
253    ///     .into_keyed()
254    ///     .filter(q!(|&x| x > 2))
255    /// #   .entries()
256    /// # }, |mut stream| async move {
257    /// // { 1: [3], 2: [4] }
258    /// # for w in vec![(1, 3), (2, 4)] {
259    /// #     assert_eq!(stream.next().await.unwrap(), w);
260    /// # }
261    /// # }));
262    /// ```
263    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
264    where
265        F: Fn(&V) -> bool + 'a,
266    {
267        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
268        KeyedStream {
269            underlying: self.underlying.filter(q!({
270                let orig = f;
271                move |(_k, v)| orig(v)
272            })),
273            _phantom_order: Default::default(),
274        }
275    }
276
277    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
278    /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
279    ///
280    /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
281    /// not modify or take ownership of the values. If you need to modify the values while filtering
282    /// use [`KeyedStream::filter_map_with_key`] instead.
283    ///
284    /// # Example
285    /// ```rust
286    /// # use hydro_lang::*;
287    /// # use futures::StreamExt;
288    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
289    /// process
290    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
291    ///     .into_keyed()
292    ///     .filter_with_key(q!(|&(k, v)| v - k == 2))
293    /// #   .entries()
294    /// # }, |mut stream| async move {
295    /// // { 1: [3], 2: [4] }
296    /// # for w in vec![(1, 3), (2, 4)] {
297    /// #     assert_eq!(stream.next().await.unwrap(), w);
298    /// # }
299    /// # }));
300    /// ```
301    pub fn filter_with_key<F>(
302        self,
303        f: impl IntoQuotedMut<'a, F, L> + Copy,
304    ) -> KeyedStream<K, V, L, B, O, R>
305    where
306        F: Fn(&(K, V)) -> bool + 'a,
307    {
308        KeyedStream {
309            underlying: self.underlying.filter(f),
310            _phantom_order: Default::default(),
311        }
312    }
313
314    /// An operator that both filters and maps each value, with keys staying the same.
315    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
316    /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
317    ///
318    /// # Example
319    /// ```rust
320    /// # use hydro_lang::*;
321    /// # use futures::StreamExt;
322    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
323    /// process
324    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
325    ///     .into_keyed()
326    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
327    /// #   .entries()
328    /// # }, |mut stream| async move {
329    /// // { 1: [2], 2: [4] }
330    /// # for w in vec![(1, 2), (2, 4)] {
331    /// #     assert_eq!(stream.next().await.unwrap(), w);
332    /// # }
333    /// # }));
334    /// ```
335    pub fn filter_map<U, F>(
336        self,
337        f: impl IntoQuotedMut<'a, F, L> + Copy,
338    ) -> KeyedStream<K, U, L, B, O, R>
339    where
340        F: Fn(V) -> Option<U> + 'a,
341    {
342        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
343        KeyedStream {
344            underlying: self.underlying.filter_map(q!({
345                let orig = f;
346                move |(k, v)| orig(v).map(|o| (k, o))
347            })),
348            _phantom_order: Default::default(),
349        }
350    }
351
352    /// An operator that both filters and maps each key-value pair. The resulting values are **not**
353    /// re-grouped even they are tuples; instead they will be grouped under the original key.
354    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
355    ///
356    /// # Example
357    /// ```rust
358    /// # use hydro_lang::*;
359    /// # use futures::StreamExt;
360    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
361    /// process
362    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
363    ///     .into_keyed()
364    ///     .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
365    /// #   .entries()
366    /// # }, |mut stream| async move {
367    /// // { 2: [2] }
368    /// # for w in vec![(2, 2)] {
369    /// #     assert_eq!(stream.next().await.unwrap(), w);
370    /// # }
371    /// # }));
372    /// ```
373    pub fn filter_map_with_key<U, F>(
374        self,
375        f: impl IntoQuotedMut<'a, F, L> + Copy,
376    ) -> KeyedStream<K, U, L, B, O, R>
377    where
378        F: Fn((K, V)) -> Option<U> + 'a,
379        K: Clone,
380    {
381        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
382        KeyedStream {
383            underlying: self.underlying.filter_map(q!({
384                let orig = f;
385                move |(k, v)| {
386                    let out = orig((k.clone(), v));
387                    out.map(|o| (k, o))
388                }
389            })),
390            _phantom_order: Default::default(),
391        }
392    }
393
394    /// An operator which allows you to "inspect" each element of a stream without
395    /// modifying it. The closure `f` is called on a reference to each value. This is
396    /// mainly useful for debugging, and should not be used to generate side-effects.
397    ///
398    /// # Example
399    /// ```rust
400    /// # use hydro_lang::*;
401    /// # use futures::StreamExt;
402    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
403    /// process
404    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
405    ///     .into_keyed()
406    ///     .inspect(q!(|v| println!("{}", v)))
407    /// #   .entries()
408    /// # }, |mut stream| async move {
409    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
410    /// #     assert_eq!(stream.next().await.unwrap(), w);
411    /// # }
412    /// # }));
413    /// ```
414    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
415    where
416        F: Fn(&V) + 'a,
417    {
418        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
419        KeyedStream {
420            underlying: self.underlying.inspect(q!({
421                let orig = f;
422                move |(_k, v)| orig(v)
423            })),
424            _phantom_order: Default::default(),
425        }
426    }
427
428    /// An operator which allows you to "inspect" each element of a stream without
429    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
430    /// mainly useful for debugging, and should not be used to generate side-effects.
431    ///
432    /// # Example
433    /// ```rust
434    /// # use hydro_lang::*;
435    /// # use futures::StreamExt;
436    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
437    /// process
438    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
439    ///     .into_keyed()
440    ///     .inspect(q!(|v| println!("{}", v)))
441    /// #   .entries()
442    /// # }, |mut stream| async move {
443    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
444    /// #     assert_eq!(stream.next().await.unwrap(), w);
445    /// # }
446    /// # }));
447    /// ```
448    pub fn inspect_with_key<F>(
449        self,
450        f: impl IntoQuotedMut<'a, F, L>,
451    ) -> KeyedStream<K, V, L, B, O, R>
452    where
453        F: Fn(&(K, V)) + 'a,
454    {
455        KeyedStream {
456            underlying: self.underlying.inspect(f),
457            _phantom_order: Default::default(),
458        }
459    }
460
461    /// An operator which allows you to "name" a `HydroNode`.
462    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
463    pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
464        {
465            let mut node = self.underlying.ir_node.borrow_mut();
466            let metadata = node.metadata_mut();
467            metadata.tag = Some(name.to_string());
468        }
469        self
470    }
471}
472
473impl<'a, K, V, L: Location<'a> + NoTick + NoAtomic, O, R> KeyedStream<K, V, L, Unbounded, O, R> {
474    /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
475    /// of any overlapping groups. The result has [`NoOrder`] on each group because the
476    /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
477    /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
478    ///
479    /// Currently, both input streams must be [`Unbounded`].
480    ///
481    /// # Example
482    /// ```rust
483    /// # use hydro_lang::*;
484    /// # use futures::StreamExt;
485    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
486    /// let numbers1 = process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed();
487    /// let numbers2 = process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed();
488    /// numbers1.interleave(numbers2)
489    /// #   .entries()
490    /// # }, |mut stream| async move {
491    /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
492    /// # for w in vec![(1, 2), (3, 4), (1, 3), (3, 5)] {
493    /// #     assert_eq!(stream.next().await.unwrap(), w);
494    /// # }
495    /// # }));
496    /// ```
497    pub fn interleave<O2, R2: MinRetries<R>>(
498        self,
499        other: KeyedStream<K, V, L, Unbounded, O2, R2>,
500    ) -> KeyedStream<K, V, L, Unbounded, NoOrder, R::Min>
501    where
502        R: MinRetries<R2, Min = R2::Min>,
503    {
504        self.entries().interleave(other.entries()).into_keyed()
505    }
506}
507
508impl<'a, K, V, L, B: Boundedness> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
509where
510    K: Eq + Hash,
511    L: Location<'a>,
512{
513    /// A special case of [`Stream::scan`] for keyd streams. For each key group the values are transformed via the `f` combinator.
514    ///
515    /// Unlike [`Stream::fold_keyed`] which only returns the final accumulated value, `scan` produces a new stream
516    /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
517    /// early by returning `None`.
518    ///
519    /// The function takes a mutable reference to the accumulator and the current element, and returns
520    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
521    /// If the function returns `None`, the stream is terminated and no more elements are processed.
522    ///
523    /// # Example
524    /// ```rust
525    /// # use hydro_lang::*;
526    /// # use futures::StreamExt;
527    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
528    /// process
529    ///     .source_iter(q!(vec![(0, 1), (0, 2), (1, 3), (1, 4)]))
530    ///     .into_keyed()
531    ///     .scan(
532    ///         q!(|| 0),
533    ///         q!(|acc, x| {
534    ///             *acc += x;
535    ///             Some(*acc)
536    ///         }),
537    ///     )
538    /// #   .entries()
539    /// # }, |mut stream| async move {
540    /// // Output: { 0: [1, 3], 1: [3, 7] }
541    /// # for w in vec![(0, 1), (0, 3), (1, 3), (1, 7)] {
542    /// #     assert_eq!(stream.next().await.unwrap(), w);
543    /// # }
544    /// # }));
545    /// ```
546    pub fn scan<A, U, I, F>(
547        self,
548        init: impl IntoQuotedMut<'a, I, L> + Copy,
549        f: impl IntoQuotedMut<'a, F, L> + Copy,
550    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
551    where
552        K: Clone,
553        I: Fn() -> A + 'a,
554        F: Fn(&mut A, V) -> Option<U> + 'a,
555    {
556        KeyedStream {
557            underlying: self
558                .underlying
559                .assume_ordering::<TotalOrder>(
560                    nondet!(/** keyed scan does not rely on order of keys */),
561                )
562                .scan_keyed(init, f)
563                .into(),
564            _phantom_order: Default::default(),
565        }
566    }
567
568    /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed in-order across the values
569    /// in each group. But the aggregation function returns a boolean, which when true indicates that the aggregated
570    /// result is complete and can be released to downstream computation. Unlike [`Stream::fold_keyed`], this means that
571    /// even if the input stream is [`crate::Unbounded`], the outputs of the fold can be processed like normal stream elements.
572    ///
573    /// # Example
574    /// ```rust
575    /// # use hydro_lang::*;
576    /// # use futures::StreamExt;
577    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
578    /// process
579    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
580    ///     .into_keyed()
581    ///     .fold_early_stop(
582    ///         q!(|| 0),
583    ///         q!(|acc, x| {
584    ///             *acc += x;
585    ///             x % 2 == 0
586    ///         }),
587    ///     )
588    /// #   .entries()
589    /// # }, |mut stream| async move {
590    /// // Output: { 0: 2, 1: 9 }
591    /// # for w in vec![(0, 2), (1, 9)] {
592    /// #     assert_eq!(stream.next().await.unwrap(), w);
593    /// # }
594    /// # }));
595    /// ```
596    pub fn fold_early_stop<A, I, F>(
597        self,
598        init: impl IntoQuotedMut<'a, I, L> + Copy,
599        f: impl IntoQuotedMut<'a, F, L> + Copy,
600    ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
601    where
602        K: Clone,
603        I: Fn() -> A + 'a,
604        F: Fn(&mut A, V) -> bool + 'a,
605    {
606        KeyedSingleton {
607            underlying: {
608                self.underlying
609                    .assume_ordering::<TotalOrder>(
610                        nondet!(/** keyed fold does not rely on order of keys */),
611                    )
612                    .fold_keyed_early_stop(init, f)
613                    .into()
614            },
615        }
616    }
617
618    /// Like [`Stream::fold`], aggregates the values in each group via the `comb` closure.
619    ///
620    /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
621    /// to depend on the order of elements in the group.
622    ///
623    /// If the input and output value types are the same and do not require initialization then use
624    /// [`KeyedStream::reduce`].
625    ///
626    /// # Example
627    /// ```rust
628    /// # use hydro_lang::*;
629    /// # use futures::StreamExt;
630    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
631    /// let tick = process.tick();
632    /// let numbers = process
633    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
634    ///     .into_keyed();
635    /// let batch = numbers.batch(&tick, nondet!(/** test */));
636    /// batch
637    ///     .fold(q!(|| 0), q!(|acc, x| *acc += x))
638    ///     .entries()
639    ///     .all_ticks()
640    /// # }, |mut stream| async move {
641    /// // (1, 5), (2, 7)
642    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
643    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
644    /// # }));
645    /// ```
646    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
647        self,
648        init: impl IntoQuotedMut<'a, I, L>,
649        comb: impl IntoQuotedMut<'a, F, L>,
650    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
651        let init = init.splice_fn0_ctx(&self.underlying.location).into();
652        let comb = comb
653            .splice_fn2_borrow_mut_ctx(&self.underlying.location)
654            .into();
655
656        let out_ir = HydroNode::FoldKeyed {
657            init,
658            acc: comb,
659            input: Box::new(self.underlying.ir_node.into_inner()),
660            metadata: self.underlying.location.new_node_metadata::<(K, A)>(),
661        };
662
663        KeyedSingleton {
664            underlying: Stream::new(self.underlying.location, out_ir),
665        }
666    }
667
668    /// Like [`Stream::reduce`], aggregates the values in each group via the `comb` closure.
669    ///
670    /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
671    /// to depend on the order of elements in the stream.
672    ///
673    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
674    ///
675    /// # Example
676    /// ```rust
677    /// # use hydro_lang::*;
678    /// # use futures::StreamExt;
679    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
680    /// let tick = process.tick();
681    /// let numbers = process
682    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
683    ///     .into_keyed();
684    /// let batch = numbers.batch(&tick, nondet!(/** test */));
685    /// batch.reduce(q!(|acc, x| *acc += x)).entries().all_ticks()
686    /// # }, |mut stream| async move {
687    /// // (1, 5), (2, 7)
688    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
689    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
690    /// # }));
691    /// ```
692    pub fn reduce<F: Fn(&mut V, V) + 'a>(
693        self,
694        comb: impl IntoQuotedMut<'a, F, L>,
695    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
696        let f = comb
697            .splice_fn2_borrow_mut_ctx(&self.underlying.location)
698            .into();
699
700        let out_ir = HydroNode::ReduceKeyed {
701            f,
702            input: Box::new(self.underlying.ir_node.into_inner()),
703            metadata: self.underlying.location.new_node_metadata::<(K, V)>(),
704        };
705
706        KeyedSingleton {
707            underlying: Stream::new(self.underlying.location, out_ir),
708        }
709    }
710
711    /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark are automatically deleted.
712    ///
713    /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
714    /// to depend on the order of elements in the stream.
715    ///
716    /// # Example
717    /// ```rust
718    /// # use hydro_lang::*;
719    /// # use futures::StreamExt;
720    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
721    /// let tick = process.tick();
722    /// let watermark = tick.singleton(q!(1));
723    /// let numbers = process
724    ///     .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
725    ///     .into_keyed();
726    /// let batch = numbers.batch(&tick, nondet!(/** test */));
727    /// batch
728    ///     .reduce_watermark(watermark, q!(|acc, x| *acc += x))
729    ///     .entries()
730    ///     .all_ticks()
731    /// # }, |mut stream| async move {
732    /// // (2, 204)
733    /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
734    /// # }));
735    /// ```
736    pub fn reduce_watermark<O, F>(
737        self,
738        other: impl Into<Optional<O, Tick<L::Root>, Bounded>>,
739        comb: impl IntoQuotedMut<'a, F, L>,
740    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
741    where
742        O: Clone,
743        F: Fn(&mut V, V) + 'a,
744    {
745        let other: Optional<O, Tick<L::Root>, Bounded> = other.into();
746        check_matching_location(&self.underlying.location.root(), other.location.outer());
747        let f = comb
748            .splice_fn2_borrow_mut_ctx(&self.underlying.location)
749            .into();
750
751        let out_ir = Stream::new(
752            self.underlying.location.clone(),
753            HydroNode::ReduceKeyedWatermark {
754                f,
755                input: Box::new(self.underlying.ir_node.into_inner()),
756                watermark: Box::new(other.ir_node.into_inner()),
757                metadata: self.underlying.location.new_node_metadata::<(K, V)>(),
758            },
759        );
760
761        KeyedSingleton { underlying: out_ir }
762    }
763}
764
765impl<'a, K, V, L, B: Boundedness, O> KeyedStream<K, V, L, B, O, ExactlyOnce>
766where
767    K: Eq + Hash,
768    L: Location<'a>,
769{
770    /// Like [`Stream::fold_commutative`], aggregates the values in each group via the `comb` closure.
771    ///
772    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
773    ///
774    /// If the input and output value types are the same and do not require initialization then use
775    /// [`KeyedStream::reduce_commutative`].
776    ///
777    /// # Example
778    /// ```rust
779    /// # use hydro_lang::*;
780    /// # use futures::StreamExt;
781    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
782    /// let tick = process.tick();
783    /// let numbers = process
784    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
785    ///     .into_keyed();
786    /// let batch = numbers.batch(&tick, nondet!(/** test */));
787    /// batch
788    ///     .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
789    ///     .entries()
790    ///     .all_ticks()
791    /// # }, |mut stream| async move {
792    /// // (1, 5), (2, 7)
793    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
794    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
795    /// # }));
796    /// ```
797    pub fn fold_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
798        self,
799        init: impl IntoQuotedMut<'a, I, L>,
800        comb: impl IntoQuotedMut<'a, F, L>,
801    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
802        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
803            .fold(init, comb)
804    }
805
806    /// Like [`Stream::reduce_commutative`], aggregates the values in each group via the `comb` closure.
807    ///
808    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
809    ///
810    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative`].
811    ///
812    /// # Example
813    /// ```rust
814    /// # use hydro_lang::*;
815    /// # use futures::StreamExt;
816    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
817    /// let tick = process.tick();
818    /// let numbers = process
819    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
820    ///     .into_keyed();
821    /// let batch = numbers.batch(&tick, nondet!(/** test */));
822    /// batch
823    ///     .reduce_commutative(q!(|acc, x| *acc += x))
824    ///     .entries()
825    ///     .all_ticks()
826    /// # }, |mut stream| async move {
827    /// // (1, 5), (2, 7)
828    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
829    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
830    /// # }));
831    /// ```
832    pub fn reduce_commutative<F: Fn(&mut V, V) + 'a>(
833        self,
834        comb: impl IntoQuotedMut<'a, F, L>,
835    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
836        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
837            .reduce(comb)
838    }
839
840    /// A special case of [`KeyedStream::reduce_commutative`] where tuples with keys less than the watermark are automatically deleted.
841    ///
842    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
843    ///
844    /// # Example
845    /// ```rust
846    /// # use hydro_lang::*;
847    /// # use futures::StreamExt;
848    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
849    /// let tick = process.tick();
850    /// let watermark = tick.singleton(q!(1));
851    /// let numbers = process
852    ///     .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
853    ///     .into_keyed();
854    /// let batch = numbers.batch(&tick, nondet!(/** test */));
855    /// batch
856    ///     .reduce_watermark_commutative(watermark, q!(|acc, x| *acc += x))
857    ///     .entries()
858    ///     .all_ticks()
859    /// # }, |mut stream| async move {
860    /// // (2, 204)
861    /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
862    /// # }));
863    /// ```
864    pub fn reduce_watermark_commutative<O2, F>(
865        self,
866        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
867        comb: impl IntoQuotedMut<'a, F, L>,
868    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
869    where
870        O2: Clone,
871        F: Fn(&mut V, V) + 'a,
872    {
873        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
874            .reduce_watermark(other, comb)
875    }
876}
877
878impl<'a, K, V, L, B: Boundedness, R> KeyedStream<K, V, L, B, TotalOrder, R>
879where
880    K: Eq + Hash,
881    L: Location<'a>,
882{
883    /// Like [`Stream::fold_idempotent`], aggregates the values in each group via the `comb` closure.
884    ///
885    /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
886    ///
887    /// If the input and output value types are the same and do not require initialization then use
888    /// [`KeyedStream::reduce_idempotent`].
889    ///
890    /// # Example
891    /// ```rust
892    /// # use hydro_lang::*;
893    /// # use futures::StreamExt;
894    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
895    /// let tick = process.tick();
896    /// let numbers = process
897    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
898    ///     .into_keyed();
899    /// let batch = numbers.batch(&tick, nondet!(/** test */));
900    /// batch
901    ///     .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
902    ///     .entries()
903    ///     .all_ticks()
904    /// # }, |mut stream| async move {
905    /// // (1, false), (2, true)
906    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
907    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
908    /// # }));
909    /// ```
910    pub fn fold_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
911        self,
912        init: impl IntoQuotedMut<'a, I, L>,
913        comb: impl IntoQuotedMut<'a, F, L>,
914    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
915        self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
916            .fold(init, comb)
917    }
918
919    /// Like [`Stream::reduce_idempotent`], aggregates the values in each group via the `comb` closure.
920    ///
921    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
922    ///
923    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_idempotent`].
924    ///
925    /// # Example
926    /// ```rust
927    /// # use hydro_lang::*;
928    /// # use futures::StreamExt;
929    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
930    /// let tick = process.tick();
931    /// let numbers = process
932    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
933    ///     .into_keyed();
934    /// let batch = numbers.batch(&tick, nondet!(/** test */));
935    /// batch
936    ///     .reduce_idempotent(q!(|acc, x| *acc |= x))
937    ///     .entries()
938    ///     .all_ticks()
939    /// # }, |mut stream| async move {
940    /// // (1, false), (2, true)
941    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
942    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
943    /// # }));
944    /// ```
945    pub fn reduce_idempotent<F: Fn(&mut V, V) + 'a>(
946        self,
947        comb: impl IntoQuotedMut<'a, F, L>,
948    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
949        self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
950            .reduce(comb)
951    }
952
953    /// A special case of [`KeyedStream::reduce_idempotent`] where tuples with keys less than the watermark are automatically deleted.
954    ///
955    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
956    ///
957    /// # Example
958    /// ```rust
959    /// # use hydro_lang::*;
960    /// # use futures::StreamExt;
961    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
962    /// let tick = process.tick();
963    /// let watermark = tick.singleton(q!(1));
964    /// let numbers = process
965    ///     .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
966    ///     .into_keyed();
967    /// let batch = numbers.batch(&tick, nondet!(/** test */));
968    /// batch
969    ///     .reduce_watermark_idempotent(watermark, q!(|acc, x| *acc |= x))
970    ///     .entries()
971    ///     .all_ticks()
972    /// # }, |mut stream| async move {
973    /// // (2, true)
974    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
975    /// # }));
976    /// ```
977    pub fn reduce_watermark_idempotent<O2, F>(
978        self,
979        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
980        comb: impl IntoQuotedMut<'a, F, L>,
981    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
982    where
983        O2: Clone,
984        F: Fn(&mut V, V) + 'a,
985    {
986        self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
987            .reduce_watermark(other, comb)
988    }
989}
990
991impl<'a, K, V, L, B: Boundedness, O, R> KeyedStream<K, V, L, B, O, R>
992where
993    K: Eq + Hash,
994    L: Location<'a>,
995{
996    /// Like [`Stream::fold_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
997    ///
998    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
999    /// as there may be non-deterministic duplicates.
1000    ///
1001    /// If the input and output value types are the same and do not require initialization then use
1002    /// [`KeyedStream::reduce_commutative_idempotent`].
1003    ///
1004    /// # Example
1005    /// ```rust
1006    /// # use hydro_lang::*;
1007    /// # use futures::StreamExt;
1008    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1009    /// let tick = process.tick();
1010    /// let numbers = process
1011    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1012    ///     .into_keyed();
1013    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1014    /// batch
1015    ///     .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1016    ///     .entries()
1017    ///     .all_ticks()
1018    /// # }, |mut stream| async move {
1019    /// // (1, false), (2, true)
1020    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1021    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1022    /// # }));
1023    /// ```
1024    pub fn fold_commutative_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1025        self,
1026        init: impl IntoQuotedMut<'a, I, L>,
1027        comb: impl IntoQuotedMut<'a, F, L>,
1028    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1029        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1030            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1031            .fold(init, comb)
1032    }
1033
1034    /// Like [`Stream::reduce_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1035    ///
1036    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1037    /// as there may be non-deterministic duplicates.
1038    ///
1039    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative_idempotent`].
1040    ///
1041    /// # Example
1042    /// ```rust
1043    /// # use hydro_lang::*;
1044    /// # use futures::StreamExt;
1045    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1046    /// let tick = process.tick();
1047    /// let numbers = process
1048    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1049    ///     .into_keyed();
1050    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1051    /// batch
1052    ///     .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1053    ///     .entries()
1054    ///     .all_ticks()
1055    /// # }, |mut stream| async move {
1056    /// // (1, false), (2, true)
1057    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1058    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1059    /// # }));
1060    /// ```
1061    pub fn reduce_commutative_idempotent<F: Fn(&mut V, V) + 'a>(
1062        self,
1063        comb: impl IntoQuotedMut<'a, F, L>,
1064    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1065        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1066            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1067            .reduce(comb)
1068    }
1069
1070    /// A special case of [`Stream::reduce_keyed_commutative_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1071    ///
1072    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1073    /// as there may be non-deterministic duplicates.
1074    ///
1075    /// # Example
1076    /// ```rust
1077    /// # use hydro_lang::*;
1078    /// # use futures::StreamExt;
1079    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1080    /// let tick = process.tick();
1081    /// let watermark = tick.singleton(q!(1));
1082    /// let numbers = process
1083    ///     .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1084    ///     .into_keyed();
1085    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1086    /// batch
1087    ///     .reduce_watermark_commutative_idempotent(watermark, q!(|acc, x| *acc |= x))
1088    ///     .entries()
1089    ///     .all_ticks()
1090    /// # }, |mut stream| async move {
1091    /// // (2, true)
1092    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1093    /// # }));
1094    /// ```
1095    pub fn reduce_watermark_commutative_idempotent<O2, F>(
1096        self,
1097        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1098        comb: impl IntoQuotedMut<'a, F, L>,
1099    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1100    where
1101        O2: Clone,
1102        F: Fn(&mut V, V) + 'a,
1103    {
1104        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1105            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1106            .reduce_watermark(other, comb)
1107    }
1108
1109    /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1110    /// whose keys are not in the bounded stream.
1111    ///
1112    /// # Example
1113    /// ```rust
1114    /// # use hydro_lang::*;
1115    /// # use futures::StreamExt;
1116    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1117    /// let tick = process.tick();
1118    /// let keyed_stream = process
1119    ///     .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1120    ///     .batch(&tick, nondet!(/** test */))
1121    ///     .into_keyed();
1122    /// let keys_to_remove = process
1123    ///     .source_iter(q!(vec![1, 2]))
1124    ///     .batch(&tick, nondet!(/** test */));
1125    /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1126    /// #   .entries()
1127    /// # }, |mut stream| async move {
1128    /// // { 3: ['c'], 4: ['d'] }
1129    /// # for w in vec![(3, 'c'), (4, 'd')] {
1130    /// #     assert_eq!(stream.next().await.unwrap(), w);
1131    /// # }
1132    /// # }));
1133    pub fn filter_key_not_in<O2, R2>(self, other: Stream<K, L, Bounded, O2, R2>) -> Self {
1134        KeyedStream {
1135            underlying: self.entries().anti_join(other),
1136            _phantom_order: Default::default(),
1137        }
1138    }
1139}
1140
1141impl<'a, K, V, L, B: Boundedness, O, R> KeyedStream<K, V, L, B, O, R>
1142where
1143    L: Location<'a> + NoTick + NoAtomic,
1144{
1145    pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
1146        KeyedStream {
1147            underlying: self.underlying.atomic(tick),
1148            _phantom_order: Default::default(),
1149        }
1150    }
1151
1152    /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
1153    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1154    /// the order of the input.
1155    ///
1156    /// # Non-Determinism
1157    /// The batch boundaries are non-deterministic and may change across executions.
1158    pub fn batch(
1159        self,
1160        tick: &Tick<L>,
1161        nondet: NonDet,
1162    ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1163        self.atomic(tick).batch(nondet)
1164    }
1165}
1166
1167impl<'a, K, V, L, B: Boundedness, O, R> KeyedStream<K, V, Atomic<L>, B, O, R>
1168where
1169    L: Location<'a> + NoTick + NoAtomic,
1170{
1171    /// Returns a keyed stream corresponding to the latest batch of elements being atomically
1172    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
1173    /// the order of the input.
1174    ///
1175    /// # Non-Determinism
1176    /// The batch boundaries are non-deterministic and may change across executions.
1177    pub fn batch(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1178        KeyedStream {
1179            underlying: self.underlying.batch(nondet),
1180            _phantom_order: Default::default(),
1181        }
1182    }
1183}
1184
1185impl<'a, K, V, L, O, R> KeyedStream<K, V, L, Bounded, O, R>
1186where
1187    L: Location<'a>,
1188{
1189    pub fn chain<O2>(
1190        self,
1191        other: KeyedStream<K, V, L, Bounded, O2, R>,
1192    ) -> KeyedStream<K, V, L, Bounded, O::Min, R>
1193    where
1194        O: MinOrder<O2>,
1195    {
1196        KeyedStream {
1197            underlying: self.underlying.chain(other.underlying),
1198            _phantom_order: Default::default(),
1199        }
1200    }
1201}
1202
1203impl<'a, K, V, L, O, R> KeyedStream<K, V, Tick<L>, Bounded, O, R>
1204where
1205    L: Location<'a>,
1206{
1207    pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
1208        KeyedStream {
1209            underlying: self.underlying.all_ticks(),
1210            _phantom_order: Default::default(),
1211        }
1212    }
1213}
1214
1215#[cfg(test)]
1216mod tests {
1217    use futures::{SinkExt, StreamExt};
1218    use hydro_deploy::Deployment;
1219    use stageleft::q;
1220
1221    use crate::location::Location;
1222    use crate::{FlowBuilder, nondet};
1223
1224    #[tokio::test]
1225    async fn reduce_watermark_filter() {
1226        let mut deployment = Deployment::new();
1227
1228        let flow = FlowBuilder::new();
1229        let node = flow.process::<()>();
1230        let external = flow.external::<()>();
1231
1232        let node_tick = node.tick();
1233        let watermark = node_tick.singleton(q!(1));
1234
1235        let sum = node
1236            .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1237            .into_keyed()
1238            .reduce_watermark(
1239                watermark,
1240                q!(|acc, v| {
1241                    *acc += v;
1242                }),
1243            )
1244            .snapshot(&node_tick, nondet!(/** test */))
1245            .entries()
1246            .all_ticks()
1247            .send_bincode_external(&external);
1248
1249        let nodes = flow
1250            .with_process(&node, deployment.Localhost())
1251            .with_external(&external, deployment.Localhost())
1252            .deploy(&mut deployment);
1253
1254        deployment.deploy().await.unwrap();
1255
1256        let mut out = nodes.connect_source_bincode(sum).await;
1257
1258        deployment.start().await.unwrap();
1259
1260        assert_eq!(out.next().await.unwrap(), (2, 204));
1261    }
1262
1263    #[tokio::test]
1264    async fn reduce_watermark_garbage_collect() {
1265        let mut deployment = Deployment::new();
1266
1267        let flow = FlowBuilder::new();
1268        let node = flow.process::<()>();
1269        let external = flow.external::<()>();
1270        let (tick_send, tick_trigger) = node.source_external_bincode(&external);
1271
1272        let node_tick = node.tick();
1273        let (watermark_complete_cycle, watermark) =
1274            node_tick.cycle_with_initial(node_tick.singleton(q!(1)));
1275        let next_watermark = watermark.clone().map(q!(|v| v + 1));
1276        watermark_complete_cycle.complete_next_tick(next_watermark);
1277
1278        let tick_triggered_input = node
1279            .source_iter(q!([(3, 103)]))
1280            .batch(&node_tick, nondet!(/** test */))
1281            .continue_if(
1282                tick_trigger
1283                    .clone()
1284                    .batch(&node_tick, nondet!(/** test */))
1285                    .first(),
1286            )
1287            .all_ticks();
1288
1289        let sum = node
1290            .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1291            .interleave(tick_triggered_input)
1292            .into_keyed()
1293            .reduce_watermark_commutative(
1294                watermark,
1295                q!(|acc, v| {
1296                    *acc += v;
1297                }),
1298            )
1299            .snapshot(&node_tick, nondet!(/** test */))
1300            .entries()
1301            .all_ticks()
1302            .send_bincode_external(&external);
1303
1304        let nodes = flow
1305            .with_default_optimize()
1306            .with_process(&node, deployment.Localhost())
1307            .with_external(&external, deployment.Localhost())
1308            .deploy(&mut deployment);
1309
1310        deployment.deploy().await.unwrap();
1311
1312        let mut tick_send = nodes.connect_sink_bincode(tick_send).await;
1313        let mut out_recv = nodes.connect_source_bincode(sum).await;
1314
1315        deployment.start().await.unwrap();
1316
1317        assert_eq!(out_recv.next().await.unwrap(), (2, 204));
1318
1319        tick_send.send(()).await.unwrap();
1320
1321        assert_eq!(out_recv.next().await.unwrap(), (3, 103));
1322    }
1323}