hydro_lang/live_collections/keyed_stream/
mod.rs

1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::collections::HashMap;
4use std::hash::Hash;
5use std::marker::PhantomData;
6
7use stageleft::{IntoQuotedMut, QuotedWithContext, q};
8
9use super::boundedness::{Bounded, Boundedness, Unbounded};
10use super::keyed_singleton::KeyedSingleton;
11use super::optional::Optional;
12use super::stream::{ExactlyOnce, MinOrder, MinRetries, NoOrder, Stream, TotalOrder};
13use crate::compile::ir::HydroNode;
14use crate::forward_handle::ForwardRef;
15#[cfg(stageleft_runtime)]
16use crate::forward_handle::{CycleCollection, ReceiverComplete};
17use crate::live_collections::stream::{Ordering, Retries};
18use crate::location::dynamic::LocationId;
19use crate::location::tick::NoAtomic;
20use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
21use crate::manual_expr::ManualExpr;
22use crate::nondet::{NonDet, nondet};
23
24pub mod networking;
25
26/// Streaming elements of type `V` grouped by a key of type `K`.
27///
28/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
29/// order of keys is non-deterministic but the order *within* each group may be deterministic.
30///
31/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
32/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
33/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
34///
35/// Type Parameters:
36/// - `K`: the type of the key for each group
37/// - `V`: the type of the elements inside each group
38/// - `Loc`: the [`Location`] where the keyed stream is materialized
39/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
40/// - `Order`: tracks whether the elements within each group have deterministic order
41///   ([`TotalOrder`]) or not ([`NoOrder`])
42/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
43///   ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
44pub struct KeyedStream<
45    K,
46    V,
47    Loc,
48    Bound: Boundedness,
49    Order: Ordering = TotalOrder,
50    Retry: Retries = ExactlyOnce,
51> {
52    pub(crate) underlying: Stream<(K, V), Loc, Bound, NoOrder, Retry>,
53    pub(crate) _phantom_order: PhantomData<Order>,
54}
55
56impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
57    for KeyedStream<K, V, L, B, NoOrder, R>
58where
59    L: Location<'a>,
60{
61    fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
62        KeyedStream {
63            underlying: stream.underlying,
64            _phantom_order: Default::default(),
65        }
66    }
67}
68
69impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
70    Clone for KeyedStream<K, V, Loc, Bound, Order, R>
71{
72    fn clone(&self) -> Self {
73        KeyedStream {
74            underlying: self.underlying.clone(),
75            _phantom_order: PhantomData,
76        }
77    }
78}
79
80impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
81    for KeyedStream<K, V, L, B, O, R>
82where
83    L: Location<'a> + NoTick,
84{
85    type Location = L;
86
87    fn create_source(ident: syn::Ident, location: L) -> Self {
88        Stream::create_source(ident, location).into_keyed()
89    }
90}
91
92impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
93    for KeyedStream<K, V, L, B, O, R>
94where
95    L: Location<'a> + NoTick,
96{
97    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
98        self.underlying.complete(ident, expected_location);
99    }
100}
101
102impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
103    KeyedStream<K, V, L, B, O, R>
104{
105    /// Explicitly "casts" the keyed stream to a type with a different ordering
106    /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
107    /// by the type-system.
108    ///
109    /// # Non-Determinism
110    /// This function is used as an escape hatch, and any mistakes in the
111    /// provided ordering guarantee will propagate into the guarantees
112    /// for the rest of the program.
113    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
114        KeyedStream {
115            underlying: self.underlying,
116            _phantom_order: PhantomData,
117        }
118    }
119
120    /// Explicitly "casts" the keyed stream to a type with a different retries
121    /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
122    /// be proven by the type-system.
123    ///
124    /// # Non-Determinism
125    /// This function is used as an escape hatch, and any mistakes in the
126    /// provided retries guarantee will propagate into the guarantees
127    /// for the rest of the program.
128    pub fn assume_retries<R2: Retries>(self, nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
129        KeyedStream {
130            underlying: self.underlying.assume_retries::<R2>(nondet),
131            _phantom_order: PhantomData,
132        }
133    }
134
135    /// Flattens the keyed stream into an unordered stream of key-value pairs.
136    ///
137    /// # Example
138    /// ```rust
139    /// # use hydro_lang::prelude::*;
140    /// # use futures::StreamExt;
141    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
142    /// process
143    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
144    ///     .into_keyed()
145    ///     .entries()
146    /// # }, |mut stream| async move {
147    /// // (1, 2), (1, 3), (2, 4) in any order
148    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
149    /// #     assert_eq!(stream.next().await.unwrap(), w);
150    /// # }
151    /// # }));
152    /// ```
153    pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
154        self.underlying
155    }
156
157    /// Flattens the keyed stream into an unordered stream of only the values.
158    ///
159    /// # Example
160    /// ```rust
161    /// # use hydro_lang::prelude::*;
162    /// # use futures::StreamExt;
163    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
164    /// process
165    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
166    ///     .into_keyed()
167    ///     .values()
168    /// # }, |mut stream| async move {
169    /// // 2, 3, 4 in any order
170    /// # for w in vec![2, 3, 4] {
171    /// #     assert_eq!(stream.next().await.unwrap(), w);
172    /// # }
173    /// # }));
174    /// ```
175    pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
176        self.underlying.map(q!(|(_, v)| v))
177    }
178
179    /// Transforms each value by invoking `f` on each element, with keys staying the same
180    /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
181    ///
182    /// If you do not want to modify the stream and instead only want to view
183    /// each item use [`KeyedStream::inspect`] instead.
184    ///
185    /// # Example
186    /// ```rust
187    /// # use hydro_lang::prelude::*;
188    /// # use futures::StreamExt;
189    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
190    /// process
191    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
192    ///     .into_keyed()
193    ///     .map(q!(|v| v + 1))
194    /// #   .entries()
195    /// # }, |mut stream| async move {
196    /// // { 1: [3, 4], 2: [5] }
197    /// # for w in vec![(1, 3), (1, 4), (2, 5)] {
198    /// #     assert_eq!(stream.next().await.unwrap(), w);
199    /// # }
200    /// # }));
201    /// ```
202    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
203    where
204        F: Fn(V) -> U + 'a,
205    {
206        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
207        KeyedStream {
208            underlying: self.underlying.map(q!({
209                let orig = f;
210                move |(k, v)| (k, orig(v))
211            })),
212            _phantom_order: Default::default(),
213        }
214    }
215
216    /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
217    /// re-grouped even they are tuples; instead they will be grouped under the original key.
218    ///
219    /// If you do not want to modify the stream and instead only want to view
220    /// each item use [`KeyedStream::inspect_with_key`] instead.
221    ///
222    /// # Example
223    /// ```rust
224    /// # use hydro_lang::prelude::*;
225    /// # use futures::StreamExt;
226    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
227    /// process
228    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
229    ///     .into_keyed()
230    ///     .map_with_key(q!(|(k, v)| k + v))
231    /// #   .entries()
232    /// # }, |mut stream| async move {
233    /// // { 1: [3, 4], 2: [6] }
234    /// # for w in vec![(1, 3), (1, 4), (2, 6)] {
235    /// #     assert_eq!(stream.next().await.unwrap(), w);
236    /// # }
237    /// # }));
238    /// ```
239    pub fn map_with_key<U, F>(
240        self,
241        f: impl IntoQuotedMut<'a, F, L> + Copy,
242    ) -> KeyedStream<K, U, L, B, O, R>
243    where
244        F: Fn((K, V)) -> U + 'a,
245        K: Clone,
246    {
247        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
248        KeyedStream {
249            underlying: self.underlying.map(q!({
250                let orig = f;
251                move |(k, v)| {
252                    let out = orig((k.clone(), v));
253                    (k, out)
254                }
255            })),
256            _phantom_order: Default::default(),
257        }
258    }
259
260    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
261    /// `f`, preserving the order of the elements within the group.
262    ///
263    /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
264    /// not modify or take ownership of the values. If you need to modify the values while filtering
265    /// use [`KeyedStream::filter_map`] instead.
266    ///
267    /// # Example
268    /// ```rust
269    /// # use hydro_lang::prelude::*;
270    /// # use futures::StreamExt;
271    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
272    /// process
273    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
274    ///     .into_keyed()
275    ///     .filter(q!(|&x| x > 2))
276    /// #   .entries()
277    /// # }, |mut stream| async move {
278    /// // { 1: [3], 2: [4] }
279    /// # for w in vec![(1, 3), (2, 4)] {
280    /// #     assert_eq!(stream.next().await.unwrap(), w);
281    /// # }
282    /// # }));
283    /// ```
284    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
285    where
286        F: Fn(&V) -> bool + 'a,
287    {
288        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
289        KeyedStream {
290            underlying: self.underlying.filter(q!({
291                let orig = f;
292                move |(_k, v)| orig(v)
293            })),
294            _phantom_order: Default::default(),
295        }
296    }
297
298    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
299    /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
300    ///
301    /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
302    /// not modify or take ownership of the values. If you need to modify the values while filtering
303    /// use [`KeyedStream::filter_map_with_key`] instead.
304    ///
305    /// # Example
306    /// ```rust
307    /// # use hydro_lang::prelude::*;
308    /// # use futures::StreamExt;
309    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
310    /// process
311    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
312    ///     .into_keyed()
313    ///     .filter_with_key(q!(|&(k, v)| v - k == 2))
314    /// #   .entries()
315    /// # }, |mut stream| async move {
316    /// // { 1: [3], 2: [4] }
317    /// # for w in vec![(1, 3), (2, 4)] {
318    /// #     assert_eq!(stream.next().await.unwrap(), w);
319    /// # }
320    /// # }));
321    /// ```
322    pub fn filter_with_key<F>(
323        self,
324        f: impl IntoQuotedMut<'a, F, L> + Copy,
325    ) -> KeyedStream<K, V, L, B, O, R>
326    where
327        F: Fn(&(K, V)) -> bool + 'a,
328    {
329        KeyedStream {
330            underlying: self.underlying.filter(f),
331            _phantom_order: Default::default(),
332        }
333    }
334
335    /// An operator that both filters and maps each value, with keys staying the same.
336    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
337    /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
338    ///
339    /// # Example
340    /// ```rust
341    /// # use hydro_lang::prelude::*;
342    /// # use futures::StreamExt;
343    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
344    /// process
345    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
346    ///     .into_keyed()
347    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
348    /// #   .entries()
349    /// # }, |mut stream| async move {
350    /// // { 1: [2], 2: [4] }
351    /// # for w in vec![(1, 2), (2, 4)] {
352    /// #     assert_eq!(stream.next().await.unwrap(), w);
353    /// # }
354    /// # }));
355    /// ```
356    pub fn filter_map<U, F>(
357        self,
358        f: impl IntoQuotedMut<'a, F, L> + Copy,
359    ) -> KeyedStream<K, U, L, B, O, R>
360    where
361        F: Fn(V) -> Option<U> + 'a,
362    {
363        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
364        KeyedStream {
365            underlying: self.underlying.filter_map(q!({
366                let orig = f;
367                move |(k, v)| orig(v).map(|o| (k, o))
368            })),
369            _phantom_order: Default::default(),
370        }
371    }
372
373    /// An operator that both filters and maps each key-value pair. The resulting values are **not**
374    /// re-grouped even they are tuples; instead they will be grouped under the original key.
375    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
376    ///
377    /// # Example
378    /// ```rust
379    /// # use hydro_lang::prelude::*;
380    /// # use futures::StreamExt;
381    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
382    /// process
383    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
384    ///     .into_keyed()
385    ///     .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
386    /// #   .entries()
387    /// # }, |mut stream| async move {
388    /// // { 2: [2] }
389    /// # for w in vec![(2, 2)] {
390    /// #     assert_eq!(stream.next().await.unwrap(), w);
391    /// # }
392    /// # }));
393    /// ```
394    pub fn filter_map_with_key<U, F>(
395        self,
396        f: impl IntoQuotedMut<'a, F, L> + Copy,
397    ) -> KeyedStream<K, U, L, B, O, R>
398    where
399        F: Fn((K, V)) -> Option<U> + 'a,
400        K: Clone,
401    {
402        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
403        KeyedStream {
404            underlying: self.underlying.filter_map(q!({
405                let orig = f;
406                move |(k, v)| {
407                    let out = orig((k.clone(), v));
408                    out.map(|o| (k, o))
409                }
410            })),
411            _phantom_order: Default::default(),
412        }
413    }
414
415    /// For each value `v` in each group, transform `v` using `f` and then treat the
416    /// result as an [`Iterator`] to produce values one by one within the same group.
417    /// The implementation for [`Iterator`] for the output type `I` must produce items
418    /// in a **deterministic** order.
419    ///
420    /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
421    /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
422    ///
423    /// # Example
424    /// ```rust
425    /// # use hydro_lang::prelude::*;
426    /// # use futures::StreamExt;
427    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
428    /// process
429    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
430    ///     .into_keyed()
431    ///     .flat_map_ordered(q!(|x| x))
432    /// #   .entries()
433    /// # }, |mut stream| async move {
434    /// // { 1: [2, 3, 4], 2: [5, 6] }
435    /// # for w in vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)] {
436    /// #     assert_eq!(stream.next().await.unwrap(), w);
437    /// # }
438    /// # }));
439    /// ```
440    pub fn flat_map_ordered<U, I, F>(
441        self,
442        f: impl IntoQuotedMut<'a, F, L> + Copy,
443    ) -> KeyedStream<K, U, L, B, O, R>
444    where
445        I: IntoIterator<Item = U>,
446        F: Fn(V) -> I + 'a,
447        K: Clone,
448    {
449        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
450        KeyedStream {
451            underlying: self.underlying.flat_map_ordered(q!({
452                let orig = f;
453                move |(k, v)| orig(v).into_iter().map(move |u| (k.clone(), u))
454            })),
455            _phantom_order: Default::default(),
456        }
457    }
458
459    /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
460    /// for the output type `I` to produce items in any order.
461    ///
462    /// # Example
463    /// ```rust
464    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
465    /// # use futures::StreamExt;
466    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
467    /// process
468    ///     .source_iter(q!(vec![
469    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
470    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
471    ///     ]))
472    ///     .into_keyed()
473    ///     .flat_map_unordered(q!(|x| x))
474    /// #   .entries()
475    /// # }, |mut stream| async move {
476    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
477    /// # let mut results = Vec::new();
478    /// # for _ in 0..4 {
479    /// #     results.push(stream.next().await.unwrap());
480    /// # }
481    /// # results.sort();
482    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
483    /// # }));
484    /// ```
485    pub fn flat_map_unordered<U, I, F>(
486        self,
487        f: impl IntoQuotedMut<'a, F, L> + Copy,
488    ) -> KeyedStream<K, U, L, B, NoOrder, R>
489    where
490        I: IntoIterator<Item = U>,
491        F: Fn(V) -> I + 'a,
492        K: Clone,
493    {
494        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
495        KeyedStream {
496            underlying: self.underlying.flat_map_unordered(q!({
497                let orig = f;
498                move |(k, v)| orig(v).into_iter().map(move |u| (k.clone(), u))
499            })),
500            _phantom_order: Default::default(),
501        }
502    }
503
504    /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
505    /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
506    /// items in a **deterministic** order.
507    ///
508    /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
509    /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
510    ///
511    /// # Example
512    /// ```rust
513    /// # use hydro_lang::prelude::*;
514    /// # use futures::StreamExt;
515    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
516    /// process
517    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
518    ///     .into_keyed()
519    ///     .flatten_ordered()
520    /// #   .entries()
521    /// # }, |mut stream| async move {
522    /// // { 1: [2, 3, 4], 2: [5, 6] }
523    /// # for w in vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)] {
524    /// #     assert_eq!(stream.next().await.unwrap(), w);
525    /// # }
526    /// # }));
527    /// ```
528    pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
529    where
530        V: IntoIterator<Item = U>,
531        K: Clone,
532    {
533        self.flat_map_ordered(q!(|d| d))
534    }
535
536    /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
537    /// for the value type `V` to produce items in any order.
538    ///
539    /// # Example
540    /// ```rust
541    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
542    /// # use futures::StreamExt;
543    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
544    /// process
545    ///     .source_iter(q!(vec![
546    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
547    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
548    ///     ]))
549    ///     .into_keyed()
550    ///     .flatten_unordered()
551    /// #   .entries()
552    /// # }, |mut stream| async move {
553    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
554    /// # let mut results = Vec::new();
555    /// # for _ in 0..4 {
556    /// #     results.push(stream.next().await.unwrap());
557    /// # }
558    /// # results.sort();
559    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
560    /// # }));
561    /// ```
562    pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
563    where
564        V: IntoIterator<Item = U>,
565        K: Clone,
566    {
567        self.flat_map_unordered(q!(|d| d))
568    }
569
570    /// An operator which allows you to "inspect" each element of a stream without
571    /// modifying it. The closure `f` is called on a reference to each value. This is
572    /// mainly useful for debugging, and should not be used to generate side-effects.
573    ///
574    /// # Example
575    /// ```rust
576    /// # use hydro_lang::prelude::*;
577    /// # use futures::StreamExt;
578    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
579    /// process
580    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
581    ///     .into_keyed()
582    ///     .inspect(q!(|v| println!("{}", v)))
583    /// #   .entries()
584    /// # }, |mut stream| async move {
585    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
586    /// #     assert_eq!(stream.next().await.unwrap(), w);
587    /// # }
588    /// # }));
589    /// ```
590    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
591    where
592        F: Fn(&V) + 'a,
593    {
594        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
595        KeyedStream {
596            underlying: self.underlying.inspect(q!({
597                let orig = f;
598                move |(_k, v)| orig(v)
599            })),
600            _phantom_order: Default::default(),
601        }
602    }
603
604    /// An operator which allows you to "inspect" each element of a stream without
605    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
606    /// mainly useful for debugging, and should not be used to generate side-effects.
607    ///
608    /// # Example
609    /// ```rust
610    /// # use hydro_lang::prelude::*;
611    /// # use futures::StreamExt;
612    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
613    /// process
614    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
615    ///     .into_keyed()
616    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
617    /// #   .entries()
618    /// # }, |mut stream| async move {
619    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
620    /// #     assert_eq!(stream.next().await.unwrap(), w);
621    /// # }
622    /// # }));
623    /// ```
624    pub fn inspect_with_key<F>(
625        self,
626        f: impl IntoQuotedMut<'a, F, L>,
627    ) -> KeyedStream<K, V, L, B, O, R>
628    where
629        F: Fn(&(K, V)) + 'a,
630    {
631        KeyedStream {
632            underlying: self.underlying.inspect(f),
633            _phantom_order: Default::default(),
634        }
635    }
636
637    /// An operator which allows you to "name" a `HydroNode`.
638    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
639    pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
640        {
641            let mut node = self.underlying.ir_node.borrow_mut();
642            let metadata = node.metadata_mut();
643            metadata.tag = Some(name.to_string());
644        }
645        self
646    }
647}
648
649impl<'a, K, V, L: Location<'a> + NoTick + NoAtomic, O: Ordering, R: Retries>
650    KeyedStream<K, V, L, Unbounded, O, R>
651{
652    /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
653    /// of any overlapping groups. The result has [`NoOrder`] on each group because the
654    /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
655    /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
656    ///
657    /// Currently, both input streams must be [`Unbounded`].
658    ///
659    /// # Example
660    /// ```rust
661    /// # use hydro_lang::prelude::*;
662    /// # use futures::StreamExt;
663    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
664    /// let numbers1 = process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed();
665    /// let numbers2 = process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed();
666    /// numbers1.interleave(numbers2)
667    /// #   .entries()
668    /// # }, |mut stream| async move {
669    /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
670    /// # for w in vec![(1, 2), (3, 4), (1, 3), (3, 5)] {
671    /// #     assert_eq!(stream.next().await.unwrap(), w);
672    /// # }
673    /// # }));
674    /// ```
675    pub fn interleave<O2: Ordering, R2: Retries>(
676        self,
677        other: KeyedStream<K, V, L, Unbounded, O2, R2>,
678    ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
679    where
680        R: MinRetries<R2>,
681    {
682        self.entries().interleave(other.entries()).into_keyed()
683    }
684}
685
686/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
687/// control the processing of future elements.
688pub enum Generate<T> {
689    /// Emit the provided element, and keep processing future inputs.
690    Yield(T),
691    /// Emit the provided element as the _final_ element, do not process future inputs.
692    Return(T),
693    /// Do not emit anything, but continue processing future inputs.
694    Continue,
695    /// Do not emit anything, and do not process further inputs.
696    Break,
697}
698
699impl<'a, K, V, L, B: Boundedness> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
700where
701    K: Eq + Hash,
702    L: Location<'a>,
703{
704    /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
705    ///
706    /// Unlike [`Stream::fold_keyed`] which only returns the final accumulated value, `scan` produces a new stream
707    /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
708    /// early by returning `None`.
709    ///
710    /// The function takes a mutable reference to the accumulator and the current element, and returns
711    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
712    /// If the function returns `None`, the stream is terminated and no more elements are processed.
713    ///
714    /// # Example
715    /// ```rust
716    /// # use hydro_lang::prelude::*;
717    /// # use futures::StreamExt;
718    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
719    /// process
720    ///     .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
721    ///     .into_keyed()
722    ///     .scan(
723    ///         q!(|| 0),
724    ///         q!(|acc, x| {
725    ///             *acc += x;
726    ///             if *acc % 2 == 0 { None } else { Some(*acc) }
727    ///         }),
728    ///     )
729    /// #   .entries()
730    /// # }, |mut stream| async move {
731    /// // Output: { 0: [1], 1: [3, 7] }
732    /// # for w in vec![(0, 1), (1, 3), (1, 7)] {
733    /// #     assert_eq!(stream.next().await.unwrap(), w);
734    /// # }
735    /// # }));
736    /// ```
737    pub fn scan<A, U, I, F>(
738        self,
739        init: impl IntoQuotedMut<'a, I, L> + Copy,
740        f: impl IntoQuotedMut<'a, F, L> + Copy,
741    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
742    where
743        K: Clone,
744        I: Fn() -> A + 'a,
745        F: Fn(&mut A, V) -> Option<U> + 'a,
746    {
747        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
748        self.generator(
749            init,
750            q!({
751                let orig = f;
752                move |state, v| {
753                    if let Some(out) = orig(state, v) {
754                        Generate::Yield(out)
755                    } else {
756                        Generate::Break
757                    }
758                }
759            }),
760        )
761    }
762
763    /// Iteratively processes the elements in each group using a state machine that can yield
764    /// elements as it processes its inputs. This is designed to mirror the unstable generator
765    /// syntax in Rust, without requiring special syntax.
766    ///
767    /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
768    /// state for each group. The second argument defines the processing logic, taking in a
769    /// mutable reference to the group's state and the value to be processed. It emits a
770    /// [`Generate`] value, whose variants define what is emitted and whether further inputs
771    /// should be processed.
772    ///
773    /// # Example
774    /// ```rust
775    /// # use hydro_lang::prelude::*;
776    /// # use futures::StreamExt;
777    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
778    /// process
779    ///     .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
780    ///     .into_keyed()
781    ///     .generator(
782    ///         q!(|| 0),
783    ///         q!(|acc, x| {
784    ///             *acc += x;
785    ///             if *acc > 100 {
786    ///                 hydro_lang::live_collections::keyed_stream::Generate::Return(
787    ///                     "done!".to_string()
788    ///                 )
789    ///             } else if *acc % 2 == 0 {
790    ///                 hydro_lang::live_collections::keyed_stream::Generate::Yield(
791    ///                     "even".to_string()
792    ///                 )
793    ///             } else {
794    ///                 hydro_lang::live_collections::keyed_stream::Generate::Continue
795    ///             }
796    ///         }),
797    ///     )
798    /// #   .entries()
799    /// # }, |mut stream| async move {
800    /// // Output: { 0: ["even", "done!"], 1: ["even"] }
801    /// # for w in vec![(0, "even".to_string()), (0, "done!".to_string()), (1, "even".to_string())] {
802    /// #     assert_eq!(stream.next().await.unwrap(), w);
803    /// # }
804    /// # }));
805    /// ```
806    pub fn generator<A, U, I, F>(
807        self,
808        init: impl IntoQuotedMut<'a, I, L> + Copy,
809        f: impl IntoQuotedMut<'a, F, L> + Copy,
810    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
811    where
812        K: Clone,
813        I: Fn() -> A + 'a,
814        F: Fn(&mut A, V) -> Generate<U> + 'a,
815    {
816        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
817        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
818        let underlying_scanned = self
819            .underlying
820            .assume_ordering(nondet!(
821                /** we do not rely on the order of keys */
822            ))
823            .scan(
824                q!(|| HashMap::new()),
825                q!(move |acc, (k, v)| {
826                    let existing_state = acc.entry(k.clone()).or_insert_with(|| Some(init()));
827                    if let Some(existing_state_value) = existing_state {
828                        match f(existing_state_value, v) {
829                            Generate::Yield(out) => Some(Some((k, out))),
830                            Generate::Return(out) => {
831                                let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
832                                Some(Some((k, out)))
833                            }
834                            Generate::Break => {
835                                let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
836                                Some(None)
837                            }
838                            Generate::Continue => Some(None),
839                        }
840                    } else {
841                        Some(None)
842                    }
843                }),
844            )
845            .flatten_ordered();
846
847        KeyedStream {
848            underlying: underlying_scanned.into(),
849            _phantom_order: Default::default(),
850        }
851    }
852
853    /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
854    /// in-order across the values in each group. But the aggregation function returns a boolean,
855    /// which when true indicates that the aggregated result is complete and can be released to
856    /// downstream computation. Unlike [`Stream::fold_keyed`], this means that even if the input
857    /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
858    /// normal stream elements.
859    ///
860    /// # Example
861    /// ```rust
862    /// # use hydro_lang::prelude::*;
863    /// # use futures::StreamExt;
864    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
865    /// process
866    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
867    ///     .into_keyed()
868    ///     .fold_early_stop(
869    ///         q!(|| 0),
870    ///         q!(|acc, x| {
871    ///             *acc += x;
872    ///             x % 2 == 0
873    ///         }),
874    ///     )
875    /// #   .entries()
876    /// # }, |mut stream| async move {
877    /// // Output: { 0: 2, 1: 9 }
878    /// # for w in vec![(0, 2), (1, 9)] {
879    /// #     assert_eq!(stream.next().await.unwrap(), w);
880    /// # }
881    /// # }));
882    /// ```
883    pub fn fold_early_stop<A, I, F>(
884        self,
885        init: impl IntoQuotedMut<'a, I, L> + Copy,
886        f: impl IntoQuotedMut<'a, F, L> + Copy,
887    ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
888    where
889        K: Clone,
890        I: Fn() -> A + 'a,
891        F: Fn(&mut A, V) -> bool + 'a,
892    {
893        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
894        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
895        let out_without_bound_cast = self
896            .generator(
897                q!(move || Some(init())),
898                q!(move |key_state, v| {
899                    if let Some(key_state_value) = key_state.as_mut() {
900                        if f(key_state_value, v) {
901                            Generate::Return(key_state.take().unwrap())
902                        } else {
903                            Generate::Continue
904                        }
905                    } else {
906                        unreachable!()
907                    }
908                }),
909            )
910            .underlying;
911
912        KeyedSingleton {
913            underlying: out_without_bound_cast,
914        }
915    }
916
917    /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
918    /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
919    /// otherwise the first element would be non-deterministic.
920    ///
921    /// # Example
922    /// ```rust
923    /// # use hydro_lang::prelude::*;
924    /// # use futures::StreamExt;
925    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
926    /// process
927    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
928    ///     .into_keyed()
929    ///     .first()
930    /// #   .entries()
931    /// # }, |mut stream| async move {
932    /// // Output: { 0: 2, 1: 3 }
933    /// # for w in vec![(0, 2), (1, 3)] {
934    /// #     assert_eq!(stream.next().await.unwrap(), w);
935    /// # }
936    /// # }));
937    /// ```
938    pub fn first(self) -> KeyedSingleton<K, V, L, B::WhenValueBounded>
939    where
940        K: Clone,
941    {
942        self.fold_early_stop(
943            q!(|| None),
944            q!(|acc, v| {
945                *acc = Some(v);
946                true
947            }),
948        )
949        .map(q!(|v| v.unwrap()))
950    }
951
952    /// Like [`Stream::fold`], aggregates the values in each group via the `comb` closure.
953    ///
954    /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
955    /// to depend on the order of elements in the group.
956    ///
957    /// If the input and output value types are the same and do not require initialization then use
958    /// [`KeyedStream::reduce`].
959    ///
960    /// # Example
961    /// ```rust
962    /// # use hydro_lang::prelude::*;
963    /// # use futures::StreamExt;
964    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
965    /// let tick = process.tick();
966    /// let numbers = process
967    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
968    ///     .into_keyed();
969    /// let batch = numbers.batch(&tick, nondet!(/** test */));
970    /// batch
971    ///     .fold(q!(|| 0), q!(|acc, x| *acc += x))
972    ///     .entries()
973    ///     .all_ticks()
974    /// # }, |mut stream| async move {
975    /// // (1, 5), (2, 7)
976    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
977    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
978    /// # }));
979    /// ```
980    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
981        self,
982        init: impl IntoQuotedMut<'a, I, L>,
983        comb: impl IntoQuotedMut<'a, F, L>,
984    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
985        let init = init.splice_fn0_ctx(&self.underlying.location).into();
986        let comb = comb
987            .splice_fn2_borrow_mut_ctx(&self.underlying.location)
988            .into();
989
990        let out_ir = HydroNode::FoldKeyed {
991            init,
992            acc: comb,
993            input: Box::new(self.underlying.ir_node.into_inner()),
994            metadata: self.underlying.location.new_node_metadata::<(K, A)>(),
995        };
996
997        KeyedSingleton {
998            underlying: Stream::new(self.underlying.location, out_ir),
999        }
1000    }
1001
1002    /// Like [`Stream::reduce`], aggregates the values in each group via the `comb` closure.
1003    ///
1004    /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1005    /// to depend on the order of elements in the stream.
1006    ///
1007    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1008    ///
1009    /// # Example
1010    /// ```rust
1011    /// # use hydro_lang::prelude::*;
1012    /// # use futures::StreamExt;
1013    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1014    /// let tick = process.tick();
1015    /// let numbers = process
1016    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1017    ///     .into_keyed();
1018    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1019    /// batch.reduce(q!(|acc, x| *acc += x)).entries().all_ticks()
1020    /// # }, |mut stream| async move {
1021    /// // (1, 5), (2, 7)
1022    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1023    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1024    /// # }));
1025    /// ```
1026    pub fn reduce<F: Fn(&mut V, V) + 'a>(
1027        self,
1028        comb: impl IntoQuotedMut<'a, F, L>,
1029    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1030        let f = comb
1031            .splice_fn2_borrow_mut_ctx(&self.underlying.location)
1032            .into();
1033
1034        let out_ir = HydroNode::ReduceKeyed {
1035            f,
1036            input: Box::new(self.underlying.ir_node.into_inner()),
1037            metadata: self.underlying.location.new_node_metadata::<(K, V)>(),
1038        };
1039
1040        KeyedSingleton {
1041            underlying: Stream::new(self.underlying.location, out_ir),
1042        }
1043    }
1044
1045    /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark are automatically deleted.
1046    ///
1047    /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1048    /// to depend on the order of elements in the stream.
1049    ///
1050    /// # Example
1051    /// ```rust
1052    /// # use hydro_lang::prelude::*;
1053    /// # use futures::StreamExt;
1054    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1055    /// let tick = process.tick();
1056    /// let watermark = tick.singleton(q!(1));
1057    /// let numbers = process
1058    ///     .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1059    ///     .into_keyed();
1060    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1061    /// batch
1062    ///     .reduce_watermark(watermark, q!(|acc, x| *acc += x))
1063    ///     .entries()
1064    ///     .all_ticks()
1065    /// # }, |mut stream| async move {
1066    /// // (2, 204)
1067    /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
1068    /// # }));
1069    /// ```
1070    pub fn reduce_watermark<O, F>(
1071        self,
1072        other: impl Into<Optional<O, Tick<L::Root>, Bounded>>,
1073        comb: impl IntoQuotedMut<'a, F, L>,
1074    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1075    where
1076        O: Clone,
1077        F: Fn(&mut V, V) + 'a,
1078    {
1079        let other: Optional<O, Tick<L::Root>, Bounded> = other.into();
1080        check_matching_location(&self.underlying.location.root(), other.location.outer());
1081        let f = comb
1082            .splice_fn2_borrow_mut_ctx(&self.underlying.location)
1083            .into();
1084
1085        let out_ir = Stream::new(
1086            self.underlying.location.clone(),
1087            HydroNode::ReduceKeyedWatermark {
1088                f,
1089                input: Box::new(self.underlying.ir_node.into_inner()),
1090                watermark: Box::new(other.ir_node.into_inner()),
1091                metadata: self.underlying.location.new_node_metadata::<(K, V)>(),
1092            },
1093        );
1094
1095        KeyedSingleton { underlying: out_ir }
1096    }
1097}
1098
1099impl<'a, K, V, L, B: Boundedness, O: Ordering> KeyedStream<K, V, L, B, O, ExactlyOnce>
1100where
1101    K: Eq + Hash,
1102    L: Location<'a>,
1103{
1104    /// Like [`Stream::fold_commutative`], aggregates the values in each group via the `comb` closure.
1105    ///
1106    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1107    ///
1108    /// If the input and output value types are the same and do not require initialization then use
1109    /// [`KeyedStream::reduce_commutative`].
1110    ///
1111    /// # Example
1112    /// ```rust
1113    /// # use hydro_lang::prelude::*;
1114    /// # use futures::StreamExt;
1115    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1116    /// let tick = process.tick();
1117    /// let numbers = process
1118    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1119    ///     .into_keyed();
1120    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1121    /// batch
1122    ///     .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1123    ///     .entries()
1124    ///     .all_ticks()
1125    /// # }, |mut stream| async move {
1126    /// // (1, 5), (2, 7)
1127    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1128    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1129    /// # }));
1130    /// ```
1131    pub fn fold_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1132        self,
1133        init: impl IntoQuotedMut<'a, I, L>,
1134        comb: impl IntoQuotedMut<'a, F, L>,
1135    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1136        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1137            .fold(init, comb)
1138    }
1139
1140    /// Like [`Stream::reduce_commutative`], aggregates the values in each group via the `comb` closure.
1141    ///
1142    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1143    ///
1144    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative`].
1145    ///
1146    /// # Example
1147    /// ```rust
1148    /// # use hydro_lang::prelude::*;
1149    /// # use futures::StreamExt;
1150    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1151    /// let tick = process.tick();
1152    /// let numbers = process
1153    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1154    ///     .into_keyed();
1155    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1156    /// batch
1157    ///     .reduce_commutative(q!(|acc, x| *acc += x))
1158    ///     .entries()
1159    ///     .all_ticks()
1160    /// # }, |mut stream| async move {
1161    /// // (1, 5), (2, 7)
1162    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1163    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1164    /// # }));
1165    /// ```
1166    pub fn reduce_commutative<F: Fn(&mut V, V) + 'a>(
1167        self,
1168        comb: impl IntoQuotedMut<'a, F, L>,
1169    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1170        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1171            .reduce(comb)
1172    }
1173
1174    /// A special case of [`KeyedStream::reduce_commutative`] where tuples with keys less than the watermark are automatically deleted.
1175    ///
1176    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1177    ///
1178    /// # Example
1179    /// ```rust
1180    /// # use hydro_lang::prelude::*;
1181    /// # use futures::StreamExt;
1182    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1183    /// let tick = process.tick();
1184    /// let watermark = tick.singleton(q!(1));
1185    /// let numbers = process
1186    ///     .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1187    ///     .into_keyed();
1188    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1189    /// batch
1190    ///     .reduce_watermark_commutative(watermark, q!(|acc, x| *acc += x))
1191    ///     .entries()
1192    ///     .all_ticks()
1193    /// # }, |mut stream| async move {
1194    /// // (2, 204)
1195    /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
1196    /// # }));
1197    /// ```
1198    pub fn reduce_watermark_commutative<O2, F>(
1199        self,
1200        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1201        comb: impl IntoQuotedMut<'a, F, L>,
1202    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1203    where
1204        O2: Clone,
1205        F: Fn(&mut V, V) + 'a,
1206    {
1207        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1208            .reduce_watermark(other, comb)
1209    }
1210}
1211
1212impl<'a, K, V, L, B: Boundedness, R: Retries> KeyedStream<K, V, L, B, TotalOrder, R>
1213where
1214    K: Eq + Hash,
1215    L: Location<'a>,
1216{
1217    /// Like [`Stream::fold_idempotent`], aggregates the values in each group via the `comb` closure.
1218    ///
1219    /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
1220    ///
1221    /// If the input and output value types are the same and do not require initialization then use
1222    /// [`KeyedStream::reduce_idempotent`].
1223    ///
1224    /// # Example
1225    /// ```rust
1226    /// # use hydro_lang::prelude::*;
1227    /// # use futures::StreamExt;
1228    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1229    /// let tick = process.tick();
1230    /// let numbers = process
1231    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1232    ///     .into_keyed();
1233    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1234    /// batch
1235    ///     .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1236    ///     .entries()
1237    ///     .all_ticks()
1238    /// # }, |mut stream| async move {
1239    /// // (1, false), (2, true)
1240    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1241    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1242    /// # }));
1243    /// ```
1244    pub fn fold_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1245        self,
1246        init: impl IntoQuotedMut<'a, I, L>,
1247        comb: impl IntoQuotedMut<'a, F, L>,
1248    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1249        self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1250            .fold(init, comb)
1251    }
1252
1253    /// Like [`Stream::reduce_idempotent`], aggregates the values in each group via the `comb` closure.
1254    ///
1255    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1256    ///
1257    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_idempotent`].
1258    ///
1259    /// # Example
1260    /// ```rust
1261    /// # use hydro_lang::prelude::*;
1262    /// # use futures::StreamExt;
1263    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1264    /// let tick = process.tick();
1265    /// let numbers = process
1266    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1267    ///     .into_keyed();
1268    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1269    /// batch
1270    ///     .reduce_idempotent(q!(|acc, x| *acc |= x))
1271    ///     .entries()
1272    ///     .all_ticks()
1273    /// # }, |mut stream| async move {
1274    /// // (1, false), (2, true)
1275    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1276    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1277    /// # }));
1278    /// ```
1279    pub fn reduce_idempotent<F: Fn(&mut V, V) + 'a>(
1280        self,
1281        comb: impl IntoQuotedMut<'a, F, L>,
1282    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1283        self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1284            .reduce(comb)
1285    }
1286
1287    /// A special case of [`KeyedStream::reduce_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1288    ///
1289    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1290    ///
1291    /// # Example
1292    /// ```rust
1293    /// # use hydro_lang::prelude::*;
1294    /// # use futures::StreamExt;
1295    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1296    /// let tick = process.tick();
1297    /// let watermark = tick.singleton(q!(1));
1298    /// let numbers = process
1299    ///     .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1300    ///     .into_keyed();
1301    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1302    /// batch
1303    ///     .reduce_watermark_idempotent(watermark, q!(|acc, x| *acc |= x))
1304    ///     .entries()
1305    ///     .all_ticks()
1306    /// # }, |mut stream| async move {
1307    /// // (2, true)
1308    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1309    /// # }));
1310    /// ```
1311    pub fn reduce_watermark_idempotent<O2, F>(
1312        self,
1313        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1314        comb: impl IntoQuotedMut<'a, F, L>,
1315    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1316    where
1317        O2: Clone,
1318        F: Fn(&mut V, V) + 'a,
1319    {
1320        self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1321            .reduce_watermark(other, comb)
1322    }
1323}
1324
1325impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1326where
1327    K: Eq + Hash,
1328    L: Location<'a>,
1329{
1330    /// Like [`Stream::fold_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1331    ///
1332    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1333    /// as there may be non-deterministic duplicates.
1334    ///
1335    /// If the input and output value types are the same and do not require initialization then use
1336    /// [`KeyedStream::reduce_commutative_idempotent`].
1337    ///
1338    /// # Example
1339    /// ```rust
1340    /// # use hydro_lang::prelude::*;
1341    /// # use futures::StreamExt;
1342    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1343    /// let tick = process.tick();
1344    /// let numbers = process
1345    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1346    ///     .into_keyed();
1347    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1348    /// batch
1349    ///     .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1350    ///     .entries()
1351    ///     .all_ticks()
1352    /// # }, |mut stream| async move {
1353    /// // (1, false), (2, true)
1354    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1355    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1356    /// # }));
1357    /// ```
1358    pub fn fold_commutative_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1359        self,
1360        init: impl IntoQuotedMut<'a, I, L>,
1361        comb: impl IntoQuotedMut<'a, F, L>,
1362    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1363        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1364            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1365            .fold(init, comb)
1366    }
1367
1368    /// Like [`Stream::reduce_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1369    ///
1370    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1371    /// as there may be non-deterministic duplicates.
1372    ///
1373    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative_idempotent`].
1374    ///
1375    /// # Example
1376    /// ```rust
1377    /// # use hydro_lang::prelude::*;
1378    /// # use futures::StreamExt;
1379    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1380    /// let tick = process.tick();
1381    /// let numbers = process
1382    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1383    ///     .into_keyed();
1384    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1385    /// batch
1386    ///     .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1387    ///     .entries()
1388    ///     .all_ticks()
1389    /// # }, |mut stream| async move {
1390    /// // (1, false), (2, true)
1391    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1392    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1393    /// # }));
1394    /// ```
1395    pub fn reduce_commutative_idempotent<F: Fn(&mut V, V) + 'a>(
1396        self,
1397        comb: impl IntoQuotedMut<'a, F, L>,
1398    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1399        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1400            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1401            .reduce(comb)
1402    }
1403
1404    /// A special case of [`Stream::reduce_keyed_commutative_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1405    ///
1406    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1407    /// as there may be non-deterministic duplicates.
1408    ///
1409    /// # Example
1410    /// ```rust
1411    /// # use hydro_lang::prelude::*;
1412    /// # use futures::StreamExt;
1413    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1414    /// let tick = process.tick();
1415    /// let watermark = tick.singleton(q!(1));
1416    /// let numbers = process
1417    ///     .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1418    ///     .into_keyed();
1419    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1420    /// batch
1421    ///     .reduce_watermark_commutative_idempotent(watermark, q!(|acc, x| *acc |= x))
1422    ///     .entries()
1423    ///     .all_ticks()
1424    /// # }, |mut stream| async move {
1425    /// // (2, true)
1426    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1427    /// # }));
1428    /// ```
1429    pub fn reduce_watermark_commutative_idempotent<O2, F>(
1430        self,
1431        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1432        comb: impl IntoQuotedMut<'a, F, L>,
1433    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1434    where
1435        O2: Clone,
1436        F: Fn(&mut V, V) + 'a,
1437    {
1438        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1439            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1440            .reduce_watermark(other, comb)
1441    }
1442
1443    /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1444    /// whose keys are not in the bounded stream.
1445    ///
1446    /// # Example
1447    /// ```rust
1448    /// # use hydro_lang::prelude::*;
1449    /// # use futures::StreamExt;
1450    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1451    /// let tick = process.tick();
1452    /// let keyed_stream = process
1453    ///     .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1454    ///     .batch(&tick, nondet!(/** test */))
1455    ///     .into_keyed();
1456    /// let keys_to_remove = process
1457    ///     .source_iter(q!(vec![1, 2]))
1458    ///     .batch(&tick, nondet!(/** test */));
1459    /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1460    /// #   .entries()
1461    /// # }, |mut stream| async move {
1462    /// // { 3: ['c'], 4: ['d'] }
1463    /// # for w in vec![(3, 'c'), (4, 'd')] {
1464    /// #     assert_eq!(stream.next().await.unwrap(), w);
1465    /// # }
1466    /// # }));
1467    /// ```
1468    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1469        self,
1470        other: Stream<K, L, Bounded, O2, R2>,
1471    ) -> Self {
1472        KeyedStream {
1473            underlying: self.entries().anti_join(other),
1474            _phantom_order: Default::default(),
1475        }
1476    }
1477}
1478
1479impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1480where
1481    L: Location<'a> + NoTick + NoAtomic,
1482{
1483    /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
1484    /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
1485    ///
1486    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1487    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1488    /// argument that declares where the stream will be atomically processed. Batching a stream into
1489    /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
1490    /// [`Tick`] will introduce asynchrony.
1491    pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
1492        KeyedStream {
1493            underlying: self.underlying.atomic(tick),
1494            _phantom_order: Default::default(),
1495        }
1496    }
1497
1498    /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
1499    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1500    /// the order of the input.
1501    ///
1502    /// # Non-Determinism
1503    /// The batch boundaries are non-deterministic and may change across executions.
1504    pub fn batch(
1505        self,
1506        tick: &Tick<L>,
1507        nondet: NonDet,
1508    ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1509        self.atomic(tick).batch(nondet)
1510    }
1511}
1512
1513impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
1514where
1515    L: Location<'a> + NoTick + NoAtomic,
1516{
1517    /// Returns a keyed stream corresponding to the latest batch of elements being atomically
1518    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
1519    /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
1520    /// used to create the atomic section.
1521    ///
1522    /// # Non-Determinism
1523    /// The batch boundaries are non-deterministic and may change across executions.
1524    pub fn batch(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1525        KeyedStream {
1526            underlying: self.underlying.batch(nondet),
1527            _phantom_order: Default::default(),
1528        }
1529    }
1530
1531    /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
1532    /// See [`KeyedStream::atomic`] for more details.
1533    pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
1534        KeyedStream {
1535            underlying: self.underlying.end_atomic(),
1536            _phantom_order: Default::default(),
1537        }
1538    }
1539}
1540
1541impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, L, Bounded, O, R>
1542where
1543    L: Location<'a>,
1544{
1545    /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
1546    /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
1547    /// is only present in one of the inputs, its values are passed through as-is). The output has
1548    /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
1549    ///
1550    /// Currently, both input streams must be [`Bounded`]. This operator will block
1551    /// on the first stream until all its elements are available. In a future version,
1552    /// we will relax the requirement on the `other` stream.
1553    ///
1554    /// # Example
1555    /// ```rust
1556    /// # use hydro_lang::prelude::*;
1557    /// # use futures::StreamExt;
1558    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1559    /// let tick = process.tick();
1560    /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
1561    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1562    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1563    /// # .entries()
1564    /// # }, |mut stream| async move {
1565    /// // { 0: [2, 1], 1: [4, 3] }
1566    /// # for w in vec![(0, 2), (1, 4), (0, 1), (1, 3)] {
1567    /// #     assert_eq!(stream.next().await.unwrap(), w);
1568    /// # }
1569    /// # }));
1570    /// ```
1571    pub fn chain<O2: Ordering>(
1572        self,
1573        other: KeyedStream<K, V, L, Bounded, O2, R>,
1574    ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, R>
1575    where
1576        O: MinOrder<O2>,
1577    {
1578        KeyedStream {
1579            underlying: self.underlying.chain(other.underlying),
1580            _phantom_order: Default::default(),
1581        }
1582    }
1583}
1584
1585impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
1586where
1587    L: Location<'a>,
1588{
1589    /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
1590    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
1591    /// each key.
1592    pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
1593        KeyedStream {
1594            underlying: self.underlying.all_ticks(),
1595            _phantom_order: Default::default(),
1596        }
1597    }
1598
1599    /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
1600    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
1601    /// each key.
1602    ///
1603    /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
1604    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1605    /// stream's [`Tick`] context.
1606    pub fn all_ticks_atomic(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
1607        KeyedStream {
1608            underlying: self.underlying.all_ticks(),
1609            _phantom_order: Default::default(),
1610        }
1611    }
1612
1613    #[expect(missing_docs, reason = "TODO")]
1614    pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1615        KeyedStream {
1616            underlying: self.underlying.defer_tick(),
1617            _phantom_order: Default::default(),
1618        }
1619    }
1620}
1621
1622#[cfg(test)]
1623mod tests {
1624    use futures::{SinkExt, StreamExt};
1625    use hydro_deploy::Deployment;
1626    use stageleft::q;
1627
1628    use crate::compile::builder::FlowBuilder;
1629    use crate::location::Location;
1630    use crate::nondet::nondet;
1631
1632    #[tokio::test]
1633    async fn reduce_watermark_filter() {
1634        let mut deployment = Deployment::new();
1635
1636        let flow = FlowBuilder::new();
1637        let node = flow.process::<()>();
1638        let external = flow.external::<()>();
1639
1640        let node_tick = node.tick();
1641        let watermark = node_tick.singleton(q!(1));
1642
1643        let sum = node
1644            .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1645            .into_keyed()
1646            .reduce_watermark(
1647                watermark,
1648                q!(|acc, v| {
1649                    *acc += v;
1650                }),
1651            )
1652            .snapshot(&node_tick, nondet!(/** test */))
1653            .entries()
1654            .all_ticks()
1655            .send_bincode_external(&external);
1656
1657        let nodes = flow
1658            .with_process(&node, deployment.Localhost())
1659            .with_external(&external, deployment.Localhost())
1660            .deploy(&mut deployment);
1661
1662        deployment.deploy().await.unwrap();
1663
1664        let mut out = nodes.connect_source_bincode(sum).await;
1665
1666        deployment.start().await.unwrap();
1667
1668        assert_eq!(out.next().await.unwrap(), (2, 204));
1669    }
1670
1671    #[tokio::test]
1672    async fn reduce_watermark_garbage_collect() {
1673        let mut deployment = Deployment::new();
1674
1675        let flow = FlowBuilder::new();
1676        let node = flow.process::<()>();
1677        let external = flow.external::<()>();
1678        let (tick_send, tick_trigger) = node.source_external_bincode(&external);
1679
1680        let node_tick = node.tick();
1681        let (watermark_complete_cycle, watermark) =
1682            node_tick.cycle_with_initial(node_tick.singleton(q!(1)));
1683        let next_watermark = watermark.clone().map(q!(|v| v + 1));
1684        watermark_complete_cycle.complete_next_tick(next_watermark);
1685
1686        let tick_triggered_input = node
1687            .source_iter(q!([(3, 103)]))
1688            .batch(&node_tick, nondet!(/** test */))
1689            .filter_if_some(
1690                tick_trigger
1691                    .clone()
1692                    .batch(&node_tick, nondet!(/** test */))
1693                    .first(),
1694            )
1695            .all_ticks();
1696
1697        let sum = node
1698            .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1699            .interleave(tick_triggered_input)
1700            .into_keyed()
1701            .reduce_watermark_commutative(
1702                watermark,
1703                q!(|acc, v| {
1704                    *acc += v;
1705                }),
1706            )
1707            .snapshot(&node_tick, nondet!(/** test */))
1708            .entries()
1709            .all_ticks()
1710            .send_bincode_external(&external);
1711
1712        let nodes = flow
1713            .with_default_optimize()
1714            .with_process(&node, deployment.Localhost())
1715            .with_external(&external, deployment.Localhost())
1716            .deploy(&mut deployment);
1717
1718        deployment.deploy().await.unwrap();
1719
1720        let mut tick_send = nodes.connect_sink_bincode(tick_send).await;
1721        let mut out_recv = nodes.connect_source_bincode(sum).await;
1722
1723        deployment.start().await.unwrap();
1724
1725        assert_eq!(out_recv.next().await.unwrap(), (2, 204));
1726
1727        tick_send.send(()).await.unwrap();
1728
1729        assert_eq!(out_recv.next().await.unwrap(), (3, 103));
1730    }
1731}