hydro_lang/live_collections/keyed_stream/
mod.rs

1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, Unbounded};
13use super::keyed_singleton::KeyedSingleton;
14use super::optional::Optional;
15use super::stream::{ExactlyOnce, MinOrder, MinRetries, NoOrder, Stream, TotalOrder};
16use crate::compile::ir::{
17    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
18};
19#[cfg(stageleft_runtime)]
20use crate::forward_handle::{CycleCollection, ReceiverComplete};
21use crate::forward_handle::{ForwardRef, TickCycle};
22use crate::live_collections::stream::{Ordering, Retries};
23#[cfg(stageleft_runtime)]
24use crate::location::dynamic::{DynLocation, LocationId};
25use crate::location::tick::DeferTick;
26use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
27use crate::manual_expr::ManualExpr;
28use crate::nondet::{NonDet, nondet};
29
30pub mod networking;
31
32/// Streaming elements of type `V` grouped by a key of type `K`.
33///
34/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
35/// order of keys is non-deterministic but the order *within* each group may be deterministic.
36///
37/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
38/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
39/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
40///
41/// Type Parameters:
42/// - `K`: the type of the key for each group
43/// - `V`: the type of the elements inside each group
44/// - `Loc`: the [`Location`] where the keyed stream is materialized
45/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
46/// - `Order`: tracks whether the elements within each group have deterministic order
47///   ([`TotalOrder`]) or not ([`NoOrder`])
48/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
49///   ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
50pub struct KeyedStream<
51    K,
52    V,
53    Loc,
54    Bound: Boundedness,
55    Order: Ordering = TotalOrder,
56    Retry: Retries = ExactlyOnce,
57> {
58    pub(crate) location: Loc,
59    pub(crate) ir_node: RefCell<HydroNode>,
60
61    _phantom: PhantomData<(K, V, Loc, Bound, Order, Retry)>,
62}
63
64impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
65    for KeyedStream<K, V, L, B, NoOrder, R>
66where
67    L: Location<'a>,
68{
69    fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
70        KeyedStream {
71            location: stream.location,
72            ir_node: stream.ir_node,
73            _phantom: PhantomData,
74        }
75    }
76}
77
78impl<'a, K, V, L, O: Ordering, R: Retries> DeferTick for KeyedStream<K, V, Tick<L>, Bounded, O, R>
79where
80    L: Location<'a>,
81{
82    fn defer_tick(self) -> Self {
83        KeyedStream::defer_tick(self)
84    }
85}
86
87impl<'a, K, V, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
88    for KeyedStream<K, V, Tick<L>, Bounded, O, R>
89where
90    L: Location<'a>,
91{
92    type Location = Tick<L>;
93
94    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
95        KeyedStream {
96            location: location.clone(),
97            ir_node: RefCell::new(HydroNode::CycleSource {
98                ident,
99                metadata: location.new_node_metadata(
100                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
101                ),
102            }),
103            _phantom: PhantomData,
104        }
105    }
106}
107
108impl<'a, K, V, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
109    for KeyedStream<K, V, Tick<L>, Bounded, O, R>
110where
111    L: Location<'a>,
112{
113    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
114        assert_eq!(
115            Location::id(&self.location),
116            expected_location,
117            "locations do not match"
118        );
119
120        self.location
121            .flow_state()
122            .borrow_mut()
123            .push_root(HydroRoot::CycleSink {
124                ident,
125                input: Box::new(self.ir_node.into_inner()),
126                op_metadata: HydroIrOpMetadata::new(),
127            });
128    }
129}
130
131impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
132    for KeyedStream<K, V, L, B, O, R>
133where
134    L: Location<'a> + NoTick,
135{
136    type Location = L;
137
138    fn create_source(ident: syn::Ident, location: L) -> Self {
139        KeyedStream {
140            location: location.clone(),
141            ir_node: RefCell::new(HydroNode::CycleSource {
142                ident,
143                metadata: location
144                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
145            }),
146            _phantom: PhantomData,
147        }
148    }
149}
150
151impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
152    for KeyedStream<K, V, L, B, O, R>
153where
154    L: Location<'a> + NoTick,
155{
156    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
157        assert_eq!(
158            Location::id(&self.location),
159            expected_location,
160            "locations do not match"
161        );
162        self.location
163            .flow_state()
164            .borrow_mut()
165            .push_root(HydroRoot::CycleSink {
166                ident,
167                input: Box::new(self.ir_node.into_inner()),
168                op_metadata: HydroIrOpMetadata::new(),
169            });
170    }
171}
172
173impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
174    Clone for KeyedStream<K, V, Loc, Bound, Order, R>
175{
176    fn clone(&self) -> Self {
177        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
178            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
179            *self.ir_node.borrow_mut() = HydroNode::Tee {
180                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
181                metadata: self.location.new_node_metadata(Self::collection_kind()),
182            };
183        }
184
185        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
186            KeyedStream {
187                location: self.location.clone(),
188                ir_node: HydroNode::Tee {
189                    inner: TeeNode(inner.0.clone()),
190                    metadata: metadata.clone(),
191                }
192                .into(),
193                _phantom: PhantomData,
194            }
195        } else {
196            unreachable!()
197        }
198    }
199}
200
201impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
202    KeyedStream<K, V, L, B, O, R>
203{
204    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
205        debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
206        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
207
208        KeyedStream {
209            location,
210            ir_node: RefCell::new(ir_node),
211            _phantom: PhantomData,
212        }
213    }
214
215    /// Returns the [`CollectionKind`] corresponding to this type.
216    pub fn collection_kind() -> CollectionKind {
217        CollectionKind::KeyedStream {
218            bound: B::BOUND_KIND,
219            value_order: O::ORDERING_KIND,
220            value_retry: R::RETRIES_KIND,
221            key_type: stageleft::quote_type::<K>().into(),
222            value_type: stageleft::quote_type::<V>().into(),
223        }
224    }
225
226    /// Returns the [`Location`] where this keyed stream is being materialized.
227    pub fn location(&self) -> &L {
228        &self.location
229    }
230
231    /// Explicitly "casts" the keyed stream to a type with a different ordering
232    /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
233    /// by the type-system.
234    ///
235    /// # Non-Determinism
236    /// This function is used as an escape hatch, and any mistakes in the
237    /// provided ordering guarantee will propagate into the guarantees
238    /// for the rest of the program.
239    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
240        if O::ORDERING_KIND == O2::ORDERING_KIND {
241            KeyedStream::new(self.location, self.ir_node.into_inner())
242        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
243            // We can always weaken the ordering guarantee
244            KeyedStream::new(
245                self.location.clone(),
246                HydroNode::Cast {
247                    inner: Box::new(self.ir_node.into_inner()),
248                    metadata: self
249                        .location
250                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
251                },
252            )
253        } else {
254            KeyedStream::new(
255                self.location.clone(),
256                HydroNode::ObserveNonDet {
257                    inner: Box::new(self.ir_node.into_inner()),
258                    trusted: false,
259                    metadata: self
260                        .location
261                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
262                },
263            )
264        }
265    }
266
267    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
268    /// which is always safe because that is the weakest possible guarantee.
269    pub fn weakest_ordering(self) -> KeyedStream<K, V, L, B, NoOrder, R> {
270        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
271        self.assume_ordering::<NoOrder>(nondet)
272    }
273
274    /// Explicitly "casts" the keyed stream to a type with a different retries
275    /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
276    /// be proven by the type-system.
277    ///
278    /// # Non-Determinism
279    /// This function is used as an escape hatch, and any mistakes in the
280    /// provided retries guarantee will propagate into the guarantees
281    /// for the rest of the program.
282    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
283        if R::RETRIES_KIND == R2::RETRIES_KIND {
284            KeyedStream::new(self.location, self.ir_node.into_inner())
285        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
286            // We can always weaken the retries guarantee
287            KeyedStream::new(
288                self.location.clone(),
289                HydroNode::Cast {
290                    inner: Box::new(self.ir_node.into_inner()),
291                    metadata: self
292                        .location
293                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
294                },
295            )
296        } else {
297            KeyedStream::new(
298                self.location.clone(),
299                HydroNode::ObserveNonDet {
300                    inner: Box::new(self.ir_node.into_inner()),
301                    trusted: false,
302                    metadata: self
303                        .location
304                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
305                },
306            )
307        }
308    }
309
310    /// Flattens the keyed stream into an unordered stream of key-value pairs.
311    ///
312    /// # Example
313    /// ```rust
314    /// # use hydro_lang::prelude::*;
315    /// # use futures::StreamExt;
316    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
317    /// process
318    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
319    ///     .into_keyed()
320    ///     .entries()
321    /// # }, |mut stream| async move {
322    /// // (1, 2), (1, 3), (2, 4) in any order
323    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
324    /// #     assert_eq!(stream.next().await.unwrap(), w);
325    /// # }
326    /// # }));
327    /// ```
328    pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
329        Stream::new(
330            self.location.clone(),
331            HydroNode::Cast {
332                inner: Box::new(self.ir_node.into_inner()),
333                metadata: self
334                    .location
335                    .new_node_metadata(Stream::<(K, V), L, B, NoOrder, R>::collection_kind()),
336            },
337        )
338    }
339
340    /// Flattens the keyed stream into an unordered stream of only the values.
341    ///
342    /// # Example
343    /// ```rust
344    /// # use hydro_lang::prelude::*;
345    /// # use futures::StreamExt;
346    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
347    /// process
348    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
349    ///     .into_keyed()
350    ///     .values()
351    /// # }, |mut stream| async move {
352    /// // 2, 3, 4 in any order
353    /// # for w in vec![2, 3, 4] {
354    /// #     assert_eq!(stream.next().await.unwrap(), w);
355    /// # }
356    /// # }));
357    /// ```
358    pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
359        self.entries().map(q!(|(_, v)| v))
360    }
361
362    /// Transforms each value by invoking `f` on each element, with keys staying the same
363    /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
364    ///
365    /// If you do not want to modify the stream and instead only want to view
366    /// each item use [`KeyedStream::inspect`] instead.
367    ///
368    /// # Example
369    /// ```rust
370    /// # use hydro_lang::prelude::*;
371    /// # use futures::StreamExt;
372    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
373    /// process
374    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
375    ///     .into_keyed()
376    ///     .map(q!(|v| v + 1))
377    /// #   .entries()
378    /// # }, |mut stream| async move {
379    /// // { 1: [3, 4], 2: [5] }
380    /// # for w in vec![(1, 3), (1, 4), (2, 5)] {
381    /// #     assert_eq!(stream.next().await.unwrap(), w);
382    /// # }
383    /// # }));
384    /// ```
385    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
386    where
387        F: Fn(V) -> U + 'a,
388    {
389        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
390        let map_f = q!({
391            let orig = f;
392            move |(k, v)| (k, orig(v))
393        })
394        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
395        .into();
396
397        KeyedStream::new(
398            self.location.clone(),
399            HydroNode::Map {
400                f: map_f,
401                input: Box::new(self.ir_node.into_inner()),
402                metadata: self
403                    .location
404                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
405            },
406        )
407    }
408
409    /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
410    /// re-grouped even they are tuples; instead they will be grouped under the original key.
411    ///
412    /// If you do not want to modify the stream and instead only want to view
413    /// each item use [`KeyedStream::inspect_with_key`] instead.
414    ///
415    /// # Example
416    /// ```rust
417    /// # use hydro_lang::prelude::*;
418    /// # use futures::StreamExt;
419    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
420    /// process
421    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
422    ///     .into_keyed()
423    ///     .map_with_key(q!(|(k, v)| k + v))
424    /// #   .entries()
425    /// # }, |mut stream| async move {
426    /// // { 1: [3, 4], 2: [6] }
427    /// # for w in vec![(1, 3), (1, 4), (2, 6)] {
428    /// #     assert_eq!(stream.next().await.unwrap(), w);
429    /// # }
430    /// # }));
431    /// ```
432    pub fn map_with_key<U, F>(
433        self,
434        f: impl IntoQuotedMut<'a, F, L> + Copy,
435    ) -> KeyedStream<K, U, L, B, O, R>
436    where
437        F: Fn((K, V)) -> U + 'a,
438        K: Clone,
439    {
440        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
441        let map_f = q!({
442            let orig = f;
443            move |(k, v)| {
444                let out = orig((Clone::clone(&k), v));
445                (k, out)
446            }
447        })
448        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
449        .into();
450
451        KeyedStream::new(
452            self.location.clone(),
453            HydroNode::Map {
454                f: map_f,
455                input: Box::new(self.ir_node.into_inner()),
456                metadata: self
457                    .location
458                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
459            },
460        )
461    }
462
463    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
464    /// `f`, preserving the order of the elements within the group.
465    ///
466    /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
467    /// not modify or take ownership of the values. If you need to modify the values while filtering
468    /// use [`KeyedStream::filter_map`] instead.
469    ///
470    /// # Example
471    /// ```rust
472    /// # use hydro_lang::prelude::*;
473    /// # use futures::StreamExt;
474    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
475    /// process
476    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
477    ///     .into_keyed()
478    ///     .filter(q!(|&x| x > 2))
479    /// #   .entries()
480    /// # }, |mut stream| async move {
481    /// // { 1: [3], 2: [4] }
482    /// # for w in vec![(1, 3), (2, 4)] {
483    /// #     assert_eq!(stream.next().await.unwrap(), w);
484    /// # }
485    /// # }));
486    /// ```
487    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
488    where
489        F: Fn(&V) -> bool + 'a,
490    {
491        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
492        let filter_f = q!({
493            let orig = f;
494            move |t: &(_, _)| orig(&t.1)
495        })
496        .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
497        .into();
498
499        KeyedStream::new(
500            self.location.clone(),
501            HydroNode::Filter {
502                f: filter_f,
503                input: Box::new(self.ir_node.into_inner()),
504                metadata: self.location.new_node_metadata(Self::collection_kind()),
505            },
506        )
507    }
508
509    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
510    /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
511    ///
512    /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
513    /// not modify or take ownership of the values. If you need to modify the values while filtering
514    /// use [`KeyedStream::filter_map_with_key`] instead.
515    ///
516    /// # Example
517    /// ```rust
518    /// # use hydro_lang::prelude::*;
519    /// # use futures::StreamExt;
520    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
521    /// process
522    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
523    ///     .into_keyed()
524    ///     .filter_with_key(q!(|&(k, v)| v - k == 2))
525    /// #   .entries()
526    /// # }, |mut stream| async move {
527    /// // { 1: [3], 2: [4] }
528    /// # for w in vec![(1, 3), (2, 4)] {
529    /// #     assert_eq!(stream.next().await.unwrap(), w);
530    /// # }
531    /// # }));
532    /// ```
533    pub fn filter_with_key<F>(
534        self,
535        f: impl IntoQuotedMut<'a, F, L> + Copy,
536    ) -> KeyedStream<K, V, L, B, O, R>
537    where
538        F: Fn(&(K, V)) -> bool + 'a,
539    {
540        let filter_f = f
541            .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
542            .into();
543
544        KeyedStream::new(
545            self.location.clone(),
546            HydroNode::Filter {
547                f: filter_f,
548                input: Box::new(self.ir_node.into_inner()),
549                metadata: self.location.new_node_metadata(Self::collection_kind()),
550            },
551        )
552    }
553
554    /// An operator that both filters and maps each value, with keys staying the same.
555    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
556    /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
557    ///
558    /// # Example
559    /// ```rust
560    /// # use hydro_lang::prelude::*;
561    /// # use futures::StreamExt;
562    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
563    /// process
564    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
565    ///     .into_keyed()
566    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
567    /// #   .entries()
568    /// # }, |mut stream| async move {
569    /// // { 1: [2], 2: [4] }
570    /// # for w in vec![(1, 2), (2, 4)] {
571    /// #     assert_eq!(stream.next().await.unwrap(), w);
572    /// # }
573    /// # }));
574    /// ```
575    pub fn filter_map<U, F>(
576        self,
577        f: impl IntoQuotedMut<'a, F, L> + Copy,
578    ) -> KeyedStream<K, U, L, B, O, R>
579    where
580        F: Fn(V) -> Option<U> + 'a,
581    {
582        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
583        let filter_map_f = q!({
584            let orig = f;
585            move |(k, v)| orig(v).map(|o| (k, o))
586        })
587        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
588        .into();
589
590        KeyedStream::new(
591            self.location.clone(),
592            HydroNode::FilterMap {
593                f: filter_map_f,
594                input: Box::new(self.ir_node.into_inner()),
595                metadata: self
596                    .location
597                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
598            },
599        )
600    }
601
602    /// An operator that both filters and maps each key-value pair. The resulting values are **not**
603    /// re-grouped even they are tuples; instead they will be grouped under the original key.
604    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
605    ///
606    /// # Example
607    /// ```rust
608    /// # use hydro_lang::prelude::*;
609    /// # use futures::StreamExt;
610    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
611    /// process
612    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
613    ///     .into_keyed()
614    ///     .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
615    /// #   .entries()
616    /// # }, |mut stream| async move {
617    /// // { 2: [2] }
618    /// # for w in vec![(2, 2)] {
619    /// #     assert_eq!(stream.next().await.unwrap(), w);
620    /// # }
621    /// # }));
622    /// ```
623    pub fn filter_map_with_key<U, F>(
624        self,
625        f: impl IntoQuotedMut<'a, F, L> + Copy,
626    ) -> KeyedStream<K, U, L, B, O, R>
627    where
628        F: Fn((K, V)) -> Option<U> + 'a,
629        K: Clone,
630    {
631        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
632        let filter_map_f = q!({
633            let orig = f;
634            move |(k, v)| {
635                let out = orig((Clone::clone(&k), v));
636                out.map(|o| (k, o))
637            }
638        })
639        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
640        .into();
641
642        KeyedStream::new(
643            self.location.clone(),
644            HydroNode::FilterMap {
645                f: filter_map_f,
646                input: Box::new(self.ir_node.into_inner()),
647                metadata: self
648                    .location
649                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
650            },
651        )
652    }
653
654    /// For each value `v` in each group, transform `v` using `f` and then treat the
655    /// result as an [`Iterator`] to produce values one by one within the same group.
656    /// The implementation for [`Iterator`] for the output type `I` must produce items
657    /// in a **deterministic** order.
658    ///
659    /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
660    /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
661    ///
662    /// # Example
663    /// ```rust
664    /// # use hydro_lang::prelude::*;
665    /// # use futures::StreamExt;
666    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
667    /// process
668    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
669    ///     .into_keyed()
670    ///     .flat_map_ordered(q!(|x| x))
671    /// #   .entries()
672    /// # }, |mut stream| async move {
673    /// // { 1: [2, 3, 4], 2: [5, 6] }
674    /// # for w in vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)] {
675    /// #     assert_eq!(stream.next().await.unwrap(), w);
676    /// # }
677    /// # }));
678    /// ```
679    pub fn flat_map_ordered<U, I, F>(
680        self,
681        f: impl IntoQuotedMut<'a, F, L> + Copy,
682    ) -> KeyedStream<K, U, L, B, O, R>
683    where
684        I: IntoIterator<Item = U>,
685        F: Fn(V) -> I + 'a,
686        K: Clone,
687    {
688        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
689        let flat_map_f = q!({
690            let orig = f;
691            move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
692        })
693        .splice_fn1_ctx::<(K, V), _>(&self.location)
694        .into();
695
696        KeyedStream::new(
697            self.location.clone(),
698            HydroNode::FlatMap {
699                f: flat_map_f,
700                input: Box::new(self.ir_node.into_inner()),
701                metadata: self
702                    .location
703                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
704            },
705        )
706    }
707
708    /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
709    /// for the output type `I` to produce items in any order.
710    ///
711    /// # Example
712    /// ```rust
713    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
714    /// # use futures::StreamExt;
715    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
716    /// process
717    ///     .source_iter(q!(vec![
718    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
719    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
720    ///     ]))
721    ///     .into_keyed()
722    ///     .flat_map_unordered(q!(|x| x))
723    /// #   .entries()
724    /// # }, |mut stream| async move {
725    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
726    /// # let mut results = Vec::new();
727    /// # for _ in 0..4 {
728    /// #     results.push(stream.next().await.unwrap());
729    /// # }
730    /// # results.sort();
731    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
732    /// # }));
733    /// ```
734    pub fn flat_map_unordered<U, I, F>(
735        self,
736        f: impl IntoQuotedMut<'a, F, L> + Copy,
737    ) -> KeyedStream<K, U, L, B, NoOrder, R>
738    where
739        I: IntoIterator<Item = U>,
740        F: Fn(V) -> I + 'a,
741        K: Clone,
742    {
743        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
744        let flat_map_f = q!({
745            let orig = f;
746            move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
747        })
748        .splice_fn1_ctx::<(K, V), _>(&self.location)
749        .into();
750
751        KeyedStream::new(
752            self.location.clone(),
753            HydroNode::FlatMap {
754                f: flat_map_f,
755                input: Box::new(self.ir_node.into_inner()),
756                metadata: self
757                    .location
758                    .new_node_metadata(KeyedStream::<K, U, L, B, NoOrder, R>::collection_kind()),
759            },
760        )
761    }
762
763    /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
764    /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
765    /// items in a **deterministic** order.
766    ///
767    /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
768    /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
769    ///
770    /// # Example
771    /// ```rust
772    /// # use hydro_lang::prelude::*;
773    /// # use futures::StreamExt;
774    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
775    /// process
776    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
777    ///     .into_keyed()
778    ///     .flatten_ordered()
779    /// #   .entries()
780    /// # }, |mut stream| async move {
781    /// // { 1: [2, 3, 4], 2: [5, 6] }
782    /// # for w in vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)] {
783    /// #     assert_eq!(stream.next().await.unwrap(), w);
784    /// # }
785    /// # }));
786    /// ```
787    pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
788    where
789        V: IntoIterator<Item = U>,
790        K: Clone,
791    {
792        self.flat_map_ordered(q!(|d| d))
793    }
794
795    /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
796    /// for the value type `V` to produce items in any order.
797    ///
798    /// # Example
799    /// ```rust
800    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
801    /// # use futures::StreamExt;
802    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
803    /// process
804    ///     .source_iter(q!(vec![
805    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
806    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
807    ///     ]))
808    ///     .into_keyed()
809    ///     .flatten_unordered()
810    /// #   .entries()
811    /// # }, |mut stream| async move {
812    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
813    /// # let mut results = Vec::new();
814    /// # for _ in 0..4 {
815    /// #     results.push(stream.next().await.unwrap());
816    /// # }
817    /// # results.sort();
818    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
819    /// # }));
820    /// ```
821    pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
822    where
823        V: IntoIterator<Item = U>,
824        K: Clone,
825    {
826        self.flat_map_unordered(q!(|d| d))
827    }
828
829    /// An operator which allows you to "inspect" each element of a stream without
830    /// modifying it. The closure `f` is called on a reference to each value. This is
831    /// mainly useful for debugging, and should not be used to generate side-effects.
832    ///
833    /// # Example
834    /// ```rust
835    /// # use hydro_lang::prelude::*;
836    /// # use futures::StreamExt;
837    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
838    /// process
839    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
840    ///     .into_keyed()
841    ///     .inspect(q!(|v| println!("{}", v)))
842    /// #   .entries()
843    /// # }, |mut stream| async move {
844    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
845    /// #     assert_eq!(stream.next().await.unwrap(), w);
846    /// # }
847    /// # }));
848    /// ```
849    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
850    where
851        F: Fn(&V) + 'a,
852    {
853        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
854        let inspect_f = q!({
855            let orig = f;
856            move |t: &(_, _)| orig(&t.1)
857        })
858        .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
859        .into();
860
861        KeyedStream::new(
862            self.location.clone(),
863            HydroNode::Inspect {
864                f: inspect_f,
865                input: Box::new(self.ir_node.into_inner()),
866                metadata: self.location.new_node_metadata(Self::collection_kind()),
867            },
868        )
869    }
870
871    /// An operator which allows you to "inspect" each element of a stream without
872    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
873    /// mainly useful for debugging, and should not be used to generate side-effects.
874    ///
875    /// # Example
876    /// ```rust
877    /// # use hydro_lang::prelude::*;
878    /// # use futures::StreamExt;
879    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
880    /// process
881    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
882    ///     .into_keyed()
883    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
884    /// #   .entries()
885    /// # }, |mut stream| async move {
886    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
887    /// #     assert_eq!(stream.next().await.unwrap(), w);
888    /// # }
889    /// # }));
890    /// ```
891    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
892    where
893        F: Fn(&(K, V)) + 'a,
894    {
895        let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
896
897        KeyedStream::new(
898            self.location.clone(),
899            HydroNode::Inspect {
900                f: inspect_f,
901                input: Box::new(self.ir_node.into_inner()),
902                metadata: self.location.new_node_metadata(Self::collection_kind()),
903            },
904        )
905    }
906
907    /// An operator which allows you to "name" a `HydroNode`.
908    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
909    pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
910        {
911            let mut node = self.ir_node.borrow_mut();
912            let metadata = node.metadata_mut();
913            metadata.tag = Some(name.to_string());
914        }
915        self
916    }
917}
918
919impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries>
920    KeyedStream<K, V, L, Unbounded, O, R>
921{
922    /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
923    /// of any overlapping groups. The result has [`NoOrder`] on each group because the
924    /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
925    /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
926    ///
927    /// Currently, both input streams must be [`Unbounded`].
928    ///
929    /// # Example
930    /// ```rust
931    /// # use hydro_lang::prelude::*;
932    /// # use futures::StreamExt;
933    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
934    /// let numbers1 = process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed();
935    /// let numbers2 = process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed();
936    /// numbers1.interleave(numbers2)
937    /// #   .entries()
938    /// # }, |mut stream| async move {
939    /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
940    /// # for w in vec![(1, 2), (3, 4), (1, 3), (3, 5)] {
941    /// #     assert_eq!(stream.next().await.unwrap(), w);
942    /// # }
943    /// # }));
944    /// ```
945    pub fn interleave<O2: Ordering, R2: Retries>(
946        self,
947        other: KeyedStream<K, V, L, Unbounded, O2, R2>,
948    ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
949    where
950        R: MinRetries<R2>,
951    {
952        let tick = self.location.tick();
953        // Because the outputs are unordered, we can interleave batches from both streams.
954        let nondet_batch_interleaving = nondet!(/** output stream is NoOrder, can interleave */);
955        self.batch(&tick, nondet_batch_interleaving)
956            .weakest_ordering()
957            .chain(
958                other
959                    .batch(&tick, nondet_batch_interleaving)
960                    .weakest_ordering(),
961            )
962            .all_ticks()
963    }
964}
965
966/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
967/// control the processing of future elements.
968pub enum Generate<T> {
969    /// Emit the provided element, and keep processing future inputs.
970    Yield(T),
971    /// Emit the provided element as the _final_ element, do not process future inputs.
972    Return(T),
973    /// Do not emit anything, but continue processing future inputs.
974    Continue,
975    /// Do not emit anything, and do not process further inputs.
976    Break,
977}
978
979impl<'a, K, V, L, B: Boundedness> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
980where
981    K: Eq + Hash,
982    L: Location<'a>,
983{
984    /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
985    ///
986    /// Unlike [`Stream::fold_keyed`] which only returns the final accumulated value, `scan` produces a new stream
987    /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
988    /// early by returning `None`.
989    ///
990    /// The function takes a mutable reference to the accumulator and the current element, and returns
991    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
992    /// If the function returns `None`, the stream is terminated and no more elements are processed.
993    ///
994    /// # Example
995    /// ```rust
996    /// # use hydro_lang::prelude::*;
997    /// # use futures::StreamExt;
998    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
999    /// process
1000    ///     .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
1001    ///     .into_keyed()
1002    ///     .scan(
1003    ///         q!(|| 0),
1004    ///         q!(|acc, x| {
1005    ///             *acc += x;
1006    ///             if *acc % 2 == 0 { None } else { Some(*acc) }
1007    ///         }),
1008    ///     )
1009    /// #   .entries()
1010    /// # }, |mut stream| async move {
1011    /// // Output: { 0: [1], 1: [3, 7] }
1012    /// # for w in vec![(0, 1), (1, 3), (1, 7)] {
1013    /// #     assert_eq!(stream.next().await.unwrap(), w);
1014    /// # }
1015    /// # }));
1016    /// ```
1017    pub fn scan<A, U, I, F>(
1018        self,
1019        init: impl IntoQuotedMut<'a, I, L> + Copy,
1020        f: impl IntoQuotedMut<'a, F, L> + Copy,
1021    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1022    where
1023        K: Clone,
1024        I: Fn() -> A + 'a,
1025        F: Fn(&mut A, V) -> Option<U> + 'a,
1026    {
1027        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1028        self.generator(
1029            init,
1030            q!({
1031                let orig = f;
1032                move |state, v| {
1033                    if let Some(out) = orig(state, v) {
1034                        Generate::Yield(out)
1035                    } else {
1036                        Generate::Break
1037                    }
1038                }
1039            }),
1040        )
1041    }
1042
1043    /// Iteratively processes the elements in each group using a state machine that can yield
1044    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1045    /// syntax in Rust, without requiring special syntax.
1046    ///
1047    /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
1048    /// state for each group. The second argument defines the processing logic, taking in a
1049    /// mutable reference to the group's state and the value to be processed. It emits a
1050    /// [`Generate`] value, whose variants define what is emitted and whether further inputs
1051    /// should be processed.
1052    ///
1053    /// # Example
1054    /// ```rust
1055    /// # use hydro_lang::prelude::*;
1056    /// # use futures::StreamExt;
1057    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1058    /// process
1059    ///     .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
1060    ///     .into_keyed()
1061    ///     .generator(
1062    ///         q!(|| 0),
1063    ///         q!(|acc, x| {
1064    ///             *acc += x;
1065    ///             if *acc > 100 {
1066    ///                 hydro_lang::live_collections::keyed_stream::Generate::Return(
1067    ///                     "done!".to_string()
1068    ///                 )
1069    ///             } else if *acc % 2 == 0 {
1070    ///                 hydro_lang::live_collections::keyed_stream::Generate::Yield(
1071    ///                     "even".to_string()
1072    ///                 )
1073    ///             } else {
1074    ///                 hydro_lang::live_collections::keyed_stream::Generate::Continue
1075    ///             }
1076    ///         }),
1077    ///     )
1078    /// #   .entries()
1079    /// # }, |mut stream| async move {
1080    /// // Output: { 0: ["even", "done!"], 1: ["even"] }
1081    /// # for w in vec![(0, "even".to_string()), (0, "done!".to_string()), (1, "even".to_string())] {
1082    /// #     assert_eq!(stream.next().await.unwrap(), w);
1083    /// # }
1084    /// # }));
1085    /// ```
1086    pub fn generator<A, U, I, F>(
1087        self,
1088        init: impl IntoQuotedMut<'a, I, L> + Copy,
1089        f: impl IntoQuotedMut<'a, F, L> + Copy,
1090    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1091    where
1092        K: Clone,
1093        I: Fn() -> A + 'a,
1094        F: Fn(&mut A, V) -> Generate<U> + 'a,
1095    {
1096        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1097        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1098
1099        let scan_init = q!(|| HashMap::new())
1100            .splice_fn0_ctx::<HashMap<K, Option<A>>>(&self.location)
1101            .into();
1102        let scan_f = q!(move |acc: &mut HashMap<_, _>, (k, v)| {
1103            let existing_state = acc.entry(Clone::clone(&k)).or_insert_with(|| Some(init()));
1104            if let Some(existing_state_value) = existing_state {
1105                match f(existing_state_value, v) {
1106                    Generate::Yield(out) => Some(Some((k, out))),
1107                    Generate::Return(out) => {
1108                        let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1109                        Some(Some((k, out)))
1110                    }
1111                    Generate::Break => {
1112                        let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1113                        Some(None)
1114                    }
1115                    Generate::Continue => Some(None),
1116                }
1117            } else {
1118                Some(None)
1119            }
1120        })
1121        .splice_fn2_borrow_mut_ctx::<HashMap<K, Option<A>>, (K, V), _>(&self.location)
1122        .into();
1123
1124        let scan_node = HydroNode::Scan {
1125            init: scan_init,
1126            acc: scan_f,
1127            input: Box::new(self.ir_node.into_inner()),
1128            metadata: self.location.new_node_metadata(Stream::<
1129                Option<(K, U)>,
1130                L,
1131                B,
1132                TotalOrder,
1133                ExactlyOnce,
1134            >::collection_kind()),
1135        };
1136
1137        let flatten_f = q!(|d| d)
1138            .splice_fn1_ctx::<Option<(K, U)>, _>(&self.location)
1139            .into();
1140        let flatten_node = HydroNode::FlatMap {
1141            f: flatten_f,
1142            input: Box::new(scan_node),
1143            metadata: self.location.new_node_metadata(KeyedStream::<
1144                K,
1145                U,
1146                L,
1147                B,
1148                TotalOrder,
1149                ExactlyOnce,
1150            >::collection_kind()),
1151        };
1152
1153        KeyedStream::new(self.location.clone(), flatten_node)
1154    }
1155
1156    /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
1157    /// in-order across the values in each group. But the aggregation function returns a boolean,
1158    /// which when true indicates that the aggregated result is complete and can be released to
1159    /// downstream computation. Unlike [`Stream::fold_keyed`], this means that even if the input
1160    /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
1161    /// normal stream elements.
1162    ///
1163    /// # Example
1164    /// ```rust
1165    /// # use hydro_lang::prelude::*;
1166    /// # use futures::StreamExt;
1167    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1168    /// process
1169    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1170    ///     .into_keyed()
1171    ///     .fold_early_stop(
1172    ///         q!(|| 0),
1173    ///         q!(|acc, x| {
1174    ///             *acc += x;
1175    ///             x % 2 == 0
1176    ///         }),
1177    ///     )
1178    /// #   .entries()
1179    /// # }, |mut stream| async move {
1180    /// // Output: { 0: 2, 1: 9 }
1181    /// # for w in vec![(0, 2), (1, 9)] {
1182    /// #     assert_eq!(stream.next().await.unwrap(), w);
1183    /// # }
1184    /// # }));
1185    /// ```
1186    pub fn fold_early_stop<A, I, F>(
1187        self,
1188        init: impl IntoQuotedMut<'a, I, L> + Copy,
1189        f: impl IntoQuotedMut<'a, F, L> + Copy,
1190    ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
1191    where
1192        K: Clone,
1193        I: Fn() -> A + 'a,
1194        F: Fn(&mut A, V) -> bool + 'a,
1195    {
1196        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1197        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1198        let out_without_bound_cast = self.generator(
1199            q!(move || Some(init())),
1200            q!(move |key_state, v| {
1201                if let Some(key_state_value) = key_state.as_mut() {
1202                    if f(key_state_value, v) {
1203                        Generate::Return(key_state.take().unwrap())
1204                    } else {
1205                        Generate::Continue
1206                    }
1207                } else {
1208                    unreachable!()
1209                }
1210            }),
1211        );
1212
1213        KeyedSingleton::new(
1214            out_without_bound_cast.location.clone(),
1215            HydroNode::Cast {
1216                inner: Box::new(out_without_bound_cast.ir_node.into_inner()),
1217                metadata: out_without_bound_cast
1218                    .location
1219                    .new_node_metadata(
1220                        KeyedSingleton::<K, A, L, B::WhenValueBounded>::collection_kind(),
1221                    ),
1222            },
1223        )
1224    }
1225
1226    /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
1227    /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
1228    /// otherwise the first element would be non-deterministic.
1229    ///
1230    /// # Example
1231    /// ```rust
1232    /// # use hydro_lang::prelude::*;
1233    /// # use futures::StreamExt;
1234    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1235    /// process
1236    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1237    ///     .into_keyed()
1238    ///     .first()
1239    /// #   .entries()
1240    /// # }, |mut stream| async move {
1241    /// // Output: { 0: 2, 1: 3 }
1242    /// # for w in vec![(0, 2), (1, 3)] {
1243    /// #     assert_eq!(stream.next().await.unwrap(), w);
1244    /// # }
1245    /// # }));
1246    /// ```
1247    pub fn first(self) -> KeyedSingleton<K, V, L, B::WhenValueBounded>
1248    where
1249        K: Clone,
1250    {
1251        self.fold_early_stop(
1252            q!(|| None),
1253            q!(|acc, v| {
1254                *acc = Some(v);
1255                true
1256            }),
1257        )
1258        .map(q!(|v| v.unwrap()))
1259    }
1260
1261    /// Like [`Stream::fold`], aggregates the values in each group via the `comb` closure.
1262    ///
1263    /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1264    /// to depend on the order of elements in the group.
1265    ///
1266    /// If the input and output value types are the same and do not require initialization then use
1267    /// [`KeyedStream::reduce`].
1268    ///
1269    /// # Example
1270    /// ```rust
1271    /// # use hydro_lang::prelude::*;
1272    /// # use futures::StreamExt;
1273    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1274    /// let tick = process.tick();
1275    /// let numbers = process
1276    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1277    ///     .into_keyed();
1278    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1279    /// batch
1280    ///     .fold(q!(|| 0), q!(|acc, x| *acc += x))
1281    ///     .entries()
1282    ///     .all_ticks()
1283    /// # }, |mut stream| async move {
1284    /// // (1, 5), (2, 7)
1285    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1286    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1287    /// # }));
1288    /// ```
1289    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1290        self,
1291        init: impl IntoQuotedMut<'a, I, L>,
1292        comb: impl IntoQuotedMut<'a, F, L>,
1293    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1294        let init = init.splice_fn0_ctx(&self.location).into();
1295        let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1296
1297        KeyedSingleton::new(
1298            self.location.clone(),
1299            HydroNode::FoldKeyed {
1300                init,
1301                acc: comb,
1302                input: Box::new(self.ir_node.into_inner()),
1303                metadata: self.location.new_node_metadata(KeyedSingleton::<
1304                    K,
1305                    A,
1306                    L,
1307                    B::WhenValueUnbounded,
1308                >::collection_kind()),
1309            },
1310        )
1311    }
1312
1313    /// Like [`Stream::reduce`], aggregates the values in each group via the `comb` closure.
1314    ///
1315    /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1316    /// to depend on the order of elements in the stream.
1317    ///
1318    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1319    ///
1320    /// # Example
1321    /// ```rust
1322    /// # use hydro_lang::prelude::*;
1323    /// # use futures::StreamExt;
1324    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1325    /// let tick = process.tick();
1326    /// let numbers = process
1327    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1328    ///     .into_keyed();
1329    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1330    /// batch.reduce(q!(|acc, x| *acc += x)).entries().all_ticks()
1331    /// # }, |mut stream| async move {
1332    /// // (1, 5), (2, 7)
1333    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1334    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1335    /// # }));
1336    /// ```
1337    pub fn reduce<F: Fn(&mut V, V) + 'a>(
1338        self,
1339        comb: impl IntoQuotedMut<'a, F, L>,
1340    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1341        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1342
1343        KeyedSingleton::new(
1344            self.location.clone(),
1345            HydroNode::ReduceKeyed {
1346                f,
1347                input: Box::new(self.ir_node.into_inner()),
1348                metadata: self.location.new_node_metadata(KeyedSingleton::<
1349                    K,
1350                    V,
1351                    L,
1352                    B::WhenValueUnbounded,
1353                >::collection_kind()),
1354            },
1355        )
1356    }
1357
1358    /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark are automatically deleted.
1359    ///
1360    /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1361    /// to depend on the order of elements in the stream.
1362    ///
1363    /// # Example
1364    /// ```rust
1365    /// # use hydro_lang::prelude::*;
1366    /// # use futures::StreamExt;
1367    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1368    /// let tick = process.tick();
1369    /// let watermark = tick.singleton(q!(1));
1370    /// let numbers = process
1371    ///     .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1372    ///     .into_keyed();
1373    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1374    /// batch
1375    ///     .reduce_watermark(watermark, q!(|acc, x| *acc += x))
1376    ///     .entries()
1377    ///     .all_ticks()
1378    /// # }, |mut stream| async move {
1379    /// // (2, 204)
1380    /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
1381    /// # }));
1382    /// ```
1383    pub fn reduce_watermark<O, F>(
1384        self,
1385        other: impl Into<Optional<O, Tick<L::Root>, Bounded>>,
1386        comb: impl IntoQuotedMut<'a, F, L>,
1387    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1388    where
1389        O: Clone,
1390        F: Fn(&mut V, V) + 'a,
1391    {
1392        let other: Optional<O, Tick<L::Root>, Bounded> = other.into();
1393        check_matching_location(&self.location.root(), other.location.outer());
1394        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1395
1396        KeyedSingleton::new(
1397            self.location.clone(),
1398            HydroNode::ReduceKeyedWatermark {
1399                f,
1400                input: Box::new(self.ir_node.into_inner()),
1401                watermark: Box::new(other.ir_node.into_inner()),
1402                metadata: self.location.new_node_metadata(KeyedSingleton::<
1403                    K,
1404                    V,
1405                    L,
1406                    B::WhenValueUnbounded,
1407                >::collection_kind()),
1408            },
1409        )
1410    }
1411}
1412
1413impl<'a, K, V, L, B: Boundedness, O: Ordering> KeyedStream<K, V, L, B, O, ExactlyOnce>
1414where
1415    K: Eq + Hash,
1416    L: Location<'a>,
1417{
1418    /// Like [`Stream::fold_commutative`], aggregates the values in each group via the `comb` closure.
1419    ///
1420    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1421    ///
1422    /// If the input and output value types are the same and do not require initialization then use
1423    /// [`KeyedStream::reduce_commutative`].
1424    ///
1425    /// # Example
1426    /// ```rust
1427    /// # use hydro_lang::prelude::*;
1428    /// # use futures::StreamExt;
1429    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1430    /// let tick = process.tick();
1431    /// let numbers = process
1432    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1433    ///     .into_keyed();
1434    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1435    /// batch
1436    ///     .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1437    ///     .entries()
1438    ///     .all_ticks()
1439    /// # }, |mut stream| async move {
1440    /// // (1, 5), (2, 7)
1441    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1442    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1443    /// # }));
1444    /// ```
1445    pub fn fold_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1446        self,
1447        init: impl IntoQuotedMut<'a, I, L>,
1448        comb: impl IntoQuotedMut<'a, F, L>,
1449    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1450        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1451            .fold(init, comb)
1452    }
1453
1454    /// Like [`Stream::reduce_commutative`], aggregates the values in each group via the `comb` closure.
1455    ///
1456    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1457    ///
1458    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative`].
1459    ///
1460    /// # Example
1461    /// ```rust
1462    /// # use hydro_lang::prelude::*;
1463    /// # use futures::StreamExt;
1464    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1465    /// let tick = process.tick();
1466    /// let numbers = process
1467    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1468    ///     .into_keyed();
1469    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1470    /// batch
1471    ///     .reduce_commutative(q!(|acc, x| *acc += x))
1472    ///     .entries()
1473    ///     .all_ticks()
1474    /// # }, |mut stream| async move {
1475    /// // (1, 5), (2, 7)
1476    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1477    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1478    /// # }));
1479    /// ```
1480    pub fn reduce_commutative<F: Fn(&mut V, V) + 'a>(
1481        self,
1482        comb: impl IntoQuotedMut<'a, F, L>,
1483    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1484        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1485            .reduce(comb)
1486    }
1487
1488    /// A special case of [`KeyedStream::reduce_commutative`] where tuples with keys less than the watermark are automatically deleted.
1489    ///
1490    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1491    ///
1492    /// # Example
1493    /// ```rust
1494    /// # use hydro_lang::prelude::*;
1495    /// # use futures::StreamExt;
1496    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1497    /// let tick = process.tick();
1498    /// let watermark = tick.singleton(q!(1));
1499    /// let numbers = process
1500    ///     .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1501    ///     .into_keyed();
1502    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1503    /// batch
1504    ///     .reduce_watermark_commutative(watermark, q!(|acc, x| *acc += x))
1505    ///     .entries()
1506    ///     .all_ticks()
1507    /// # }, |mut stream| async move {
1508    /// // (2, 204)
1509    /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
1510    /// # }));
1511    /// ```
1512    pub fn reduce_watermark_commutative<O2, F>(
1513        self,
1514        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1515        comb: impl IntoQuotedMut<'a, F, L>,
1516    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1517    where
1518        O2: Clone,
1519        F: Fn(&mut V, V) + 'a,
1520    {
1521        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1522            .reduce_watermark(other, comb)
1523    }
1524}
1525
1526impl<'a, K, V, L, B: Boundedness, R: Retries> KeyedStream<K, V, L, B, TotalOrder, R>
1527where
1528    K: Eq + Hash,
1529    L: Location<'a>,
1530{
1531    /// Like [`Stream::fold_idempotent`], aggregates the values in each group via the `comb` closure.
1532    ///
1533    /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
1534    ///
1535    /// If the input and output value types are the same and do not require initialization then use
1536    /// [`KeyedStream::reduce_idempotent`].
1537    ///
1538    /// # Example
1539    /// ```rust
1540    /// # use hydro_lang::prelude::*;
1541    /// # use futures::StreamExt;
1542    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1543    /// let tick = process.tick();
1544    /// let numbers = process
1545    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1546    ///     .into_keyed();
1547    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1548    /// batch
1549    ///     .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1550    ///     .entries()
1551    ///     .all_ticks()
1552    /// # }, |mut stream| async move {
1553    /// // (1, false), (2, true)
1554    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1555    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1556    /// # }));
1557    /// ```
1558    pub fn fold_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1559        self,
1560        init: impl IntoQuotedMut<'a, I, L>,
1561        comb: impl IntoQuotedMut<'a, F, L>,
1562    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1563        self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1564            .fold(init, comb)
1565    }
1566
1567    /// Like [`Stream::reduce_idempotent`], aggregates the values in each group via the `comb` closure.
1568    ///
1569    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1570    ///
1571    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_idempotent`].
1572    ///
1573    /// # Example
1574    /// ```rust
1575    /// # use hydro_lang::prelude::*;
1576    /// # use futures::StreamExt;
1577    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1578    /// let tick = process.tick();
1579    /// let numbers = process
1580    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1581    ///     .into_keyed();
1582    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1583    /// batch
1584    ///     .reduce_idempotent(q!(|acc, x| *acc |= x))
1585    ///     .entries()
1586    ///     .all_ticks()
1587    /// # }, |mut stream| async move {
1588    /// // (1, false), (2, true)
1589    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1590    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1591    /// # }));
1592    /// ```
1593    pub fn reduce_idempotent<F: Fn(&mut V, V) + 'a>(
1594        self,
1595        comb: impl IntoQuotedMut<'a, F, L>,
1596    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1597        self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1598            .reduce(comb)
1599    }
1600
1601    /// A special case of [`KeyedStream::reduce_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1602    ///
1603    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1604    ///
1605    /// # Example
1606    /// ```rust
1607    /// # use hydro_lang::prelude::*;
1608    /// # use futures::StreamExt;
1609    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1610    /// let tick = process.tick();
1611    /// let watermark = tick.singleton(q!(1));
1612    /// let numbers = process
1613    ///     .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1614    ///     .into_keyed();
1615    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1616    /// batch
1617    ///     .reduce_watermark_idempotent(watermark, q!(|acc, x| *acc |= x))
1618    ///     .entries()
1619    ///     .all_ticks()
1620    /// # }, |mut stream| async move {
1621    /// // (2, true)
1622    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1623    /// # }));
1624    /// ```
1625    pub fn reduce_watermark_idempotent<O2, F>(
1626        self,
1627        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1628        comb: impl IntoQuotedMut<'a, F, L>,
1629    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1630    where
1631        O2: Clone,
1632        F: Fn(&mut V, V) + 'a,
1633    {
1634        self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1635            .reduce_watermark(other, comb)
1636    }
1637}
1638
1639impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1640where
1641    K: Eq + Hash,
1642    L: Location<'a>,
1643{
1644    /// Like [`Stream::fold_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1645    ///
1646    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1647    /// as there may be non-deterministic duplicates.
1648    ///
1649    /// If the input and output value types are the same and do not require initialization then use
1650    /// [`KeyedStream::reduce_commutative_idempotent`].
1651    ///
1652    /// # Example
1653    /// ```rust
1654    /// # use hydro_lang::prelude::*;
1655    /// # use futures::StreamExt;
1656    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1657    /// let tick = process.tick();
1658    /// let numbers = process
1659    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1660    ///     .into_keyed();
1661    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1662    /// batch
1663    ///     .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1664    ///     .entries()
1665    ///     .all_ticks()
1666    /// # }, |mut stream| async move {
1667    /// // (1, false), (2, true)
1668    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1669    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1670    /// # }));
1671    /// ```
1672    pub fn fold_commutative_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1673        self,
1674        init: impl IntoQuotedMut<'a, I, L>,
1675        comb: impl IntoQuotedMut<'a, F, L>,
1676    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1677        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1678            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1679            .fold(init, comb)
1680    }
1681
1682    /// Like [`Stream::reduce_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1683    ///
1684    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1685    /// as there may be non-deterministic duplicates.
1686    ///
1687    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative_idempotent`].
1688    ///
1689    /// # Example
1690    /// ```rust
1691    /// # use hydro_lang::prelude::*;
1692    /// # use futures::StreamExt;
1693    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1694    /// let tick = process.tick();
1695    /// let numbers = process
1696    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1697    ///     .into_keyed();
1698    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1699    /// batch
1700    ///     .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1701    ///     .entries()
1702    ///     .all_ticks()
1703    /// # }, |mut stream| async move {
1704    /// // (1, false), (2, true)
1705    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1706    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1707    /// # }));
1708    /// ```
1709    pub fn reduce_commutative_idempotent<F: Fn(&mut V, V) + 'a>(
1710        self,
1711        comb: impl IntoQuotedMut<'a, F, L>,
1712    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1713        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1714            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1715            .reduce(comb)
1716    }
1717
1718    /// A special case of [`Stream::reduce_keyed_commutative_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1719    ///
1720    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1721    /// as there may be non-deterministic duplicates.
1722    ///
1723    /// # Example
1724    /// ```rust
1725    /// # use hydro_lang::prelude::*;
1726    /// # use futures::StreamExt;
1727    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1728    /// let tick = process.tick();
1729    /// let watermark = tick.singleton(q!(1));
1730    /// let numbers = process
1731    ///     .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1732    ///     .into_keyed();
1733    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1734    /// batch
1735    ///     .reduce_watermark_commutative_idempotent(watermark, q!(|acc, x| *acc |= x))
1736    ///     .entries()
1737    ///     .all_ticks()
1738    /// # }, |mut stream| async move {
1739    /// // (2, true)
1740    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1741    /// # }));
1742    /// ```
1743    pub fn reduce_watermark_commutative_idempotent<O2, F>(
1744        self,
1745        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1746        comb: impl IntoQuotedMut<'a, F, L>,
1747    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1748    where
1749        O2: Clone,
1750        F: Fn(&mut V, V) + 'a,
1751    {
1752        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1753            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1754            .reduce_watermark(other, comb)
1755    }
1756
1757    /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1758    /// whose keys are not in the bounded stream.
1759    ///
1760    /// # Example
1761    /// ```rust
1762    /// # use hydro_lang::prelude::*;
1763    /// # use futures::StreamExt;
1764    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1765    /// let tick = process.tick();
1766    /// let keyed_stream = process
1767    ///     .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1768    ///     .batch(&tick, nondet!(/** test */))
1769    ///     .into_keyed();
1770    /// let keys_to_remove = process
1771    ///     .source_iter(q!(vec![1, 2]))
1772    ///     .batch(&tick, nondet!(/** test */));
1773    /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1774    /// #   .entries()
1775    /// # }, |mut stream| async move {
1776    /// // { 3: ['c'], 4: ['d'] }
1777    /// # for w in vec![(3, 'c'), (4, 'd')] {
1778    /// #     assert_eq!(stream.next().await.unwrap(), w);
1779    /// # }
1780    /// # }));
1781    /// ```
1782    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1783        self,
1784        other: Stream<K, L, Bounded, O2, R2>,
1785    ) -> Self {
1786        check_matching_location(&self.location, &other.location);
1787
1788        KeyedStream::new(
1789            self.location.clone(),
1790            HydroNode::AntiJoin {
1791                pos: Box::new(self.ir_node.into_inner()),
1792                neg: Box::new(other.ir_node.into_inner()),
1793                metadata: self.location.new_node_metadata(Self::collection_kind()),
1794            },
1795        )
1796    }
1797}
1798
1799impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1800where
1801    L: Location<'a>,
1802{
1803    /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
1804    /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
1805    ///
1806    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1807    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1808    /// argument that declares where the stream will be atomically processed. Batching a stream into
1809    /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
1810    /// [`Tick`] will introduce asynchrony.
1811    pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
1812        let out_location = Atomic { tick: tick.clone() };
1813        KeyedStream::new(
1814            out_location.clone(),
1815            HydroNode::BeginAtomic {
1816                inner: Box::new(self.ir_node.into_inner()),
1817                metadata: out_location
1818                    .new_node_metadata(KeyedStream::<K, V, Atomic<L>, B, O, R>::collection_kind()),
1819            },
1820        )
1821    }
1822
1823    /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
1824    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1825    /// the order of the input.
1826    ///
1827    /// # Non-Determinism
1828    /// The batch boundaries are non-deterministic and may change across executions.
1829    pub fn batch(
1830        self,
1831        tick: &Tick<L>,
1832        nondet: NonDet,
1833    ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1834        let _ = nondet;
1835        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1836        KeyedStream::new(
1837            tick.clone(),
1838            HydroNode::Batch {
1839                inner: Box::new(self.ir_node.into_inner()),
1840                metadata: tick.new_node_metadata(
1841                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
1842                ),
1843            },
1844        )
1845    }
1846}
1847
1848impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
1849where
1850    L: Location<'a> + NoTick,
1851{
1852    /// Returns a keyed stream corresponding to the latest batch of elements being atomically
1853    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
1854    /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
1855    /// used to create the atomic section.
1856    ///
1857    /// # Non-Determinism
1858    /// The batch boundaries are non-deterministic and may change across executions.
1859    pub fn batch_atomic(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1860        let _ = nondet;
1861        KeyedStream::new(
1862            self.location.clone().tick,
1863            HydroNode::Batch {
1864                inner: Box::new(self.ir_node.into_inner()),
1865                metadata: self.location.tick.new_node_metadata(KeyedStream::<
1866                    K,
1867                    V,
1868                    Tick<L>,
1869                    Bounded,
1870                    O,
1871                    R,
1872                >::collection_kind(
1873                )),
1874            },
1875        )
1876    }
1877
1878    /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
1879    /// See [`KeyedStream::atomic`] for more details.
1880    pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
1881        KeyedStream::new(
1882            self.location.tick.l.clone(),
1883            HydroNode::EndAtomic {
1884                inner: Box::new(self.ir_node.into_inner()),
1885                metadata: self
1886                    .location
1887                    .tick
1888                    .l
1889                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
1890            },
1891        )
1892    }
1893}
1894
1895impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, L, Bounded, O, R>
1896where
1897    L: Location<'a>,
1898{
1899    /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
1900    /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
1901    /// is only present in one of the inputs, its values are passed through as-is). The output has
1902    /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
1903    ///
1904    /// Currently, both input streams must be [`Bounded`]. This operator will block
1905    /// on the first stream until all its elements are available. In a future version,
1906    /// we will relax the requirement on the `other` stream.
1907    ///
1908    /// # Example
1909    /// ```rust
1910    /// # use hydro_lang::prelude::*;
1911    /// # use futures::StreamExt;
1912    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1913    /// let tick = process.tick();
1914    /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
1915    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1916    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1917    /// # .entries()
1918    /// # }, |mut stream| async move {
1919    /// // { 0: [2, 1], 1: [4, 3] }
1920    /// # for w in vec![(0, 2), (1, 4), (0, 1), (1, 3)] {
1921    /// #     assert_eq!(stream.next().await.unwrap(), w);
1922    /// # }
1923    /// # }));
1924    /// ```
1925    pub fn chain<O2: Ordering, R2: Retries>(
1926        self,
1927        other: KeyedStream<K, V, L, Bounded, O2, R2>,
1928    ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1929    where
1930        O: MinOrder<O2>,
1931        R: MinRetries<R2>,
1932    {
1933        check_matching_location(&self.location, &other.location);
1934
1935        KeyedStream::new(
1936            self.location.clone(),
1937            HydroNode::Chain {
1938                first: Box::new(self.ir_node.into_inner()),
1939                second: Box::new(other.ir_node.into_inner()),
1940                metadata: self.location.new_node_metadata(KeyedStream::<
1941                    K,
1942                    V,
1943                    L,
1944                    Bounded,
1945                    <O as MinOrder<O2>>::Min,
1946                    <R as MinRetries<R2>>::Min,
1947                >::collection_kind()),
1948            },
1949        )
1950    }
1951}
1952
1953impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
1954where
1955    L: Location<'a>,
1956{
1957    /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
1958    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
1959    /// each key.
1960    pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
1961        KeyedStream::new(
1962            self.location.outer().clone(),
1963            HydroNode::YieldConcat {
1964                inner: Box::new(self.ir_node.into_inner()),
1965                metadata: self.location.outer().new_node_metadata(KeyedStream::<
1966                    K,
1967                    V,
1968                    L,
1969                    Unbounded,
1970                    O,
1971                    R,
1972                >::collection_kind(
1973                )),
1974            },
1975        )
1976    }
1977
1978    /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
1979    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
1980    /// each key.
1981    ///
1982    /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
1983    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1984    /// stream's [`Tick`] context.
1985    pub fn all_ticks_atomic(self) -> KeyedStream<K, V, Atomic<L>, Unbounded, O, R> {
1986        let out_location = Atomic {
1987            tick: self.location.clone(),
1988        };
1989
1990        KeyedStream::new(
1991            out_location.clone(),
1992            HydroNode::YieldConcat {
1993                inner: Box::new(self.ir_node.into_inner()),
1994                metadata: out_location.new_node_metadata(KeyedStream::<
1995                    K,
1996                    V,
1997                    Atomic<L>,
1998                    Unbounded,
1999                    O,
2000                    R,
2001                >::collection_kind()),
2002            },
2003        )
2004    }
2005
2006    /// Shifts the entries in `self` to the **next tick**, so that the returned keyed stream at
2007    /// tick `T` always has the entries of `self` at tick `T - 1`.
2008    ///
2009    /// At tick `0`, the output keyed stream is empty, since there is no previous tick.
2010    ///
2011    /// This operator enables stateful iterative processing with ticks, by sending data from one
2012    /// tick to the next. For example, you can use it to combine inputs across consecutive batches.
2013    ///
2014    /// # Example
2015    /// ```rust
2016    /// # use hydro_lang::prelude::*;
2017    /// # use futures::StreamExt;
2018    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2019    /// let tick = process.tick();
2020    /// # // ticks are lazy by default, forces the second tick to run
2021    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2022    /// # let batch_first_tick = process
2023    /// #   .source_iter(q!(vec![(1, 2), (1, 3)]))
2024    /// #   .batch(&tick, nondet!(/** test */))
2025    /// #   .into_keyed();
2026    /// # let batch_second_tick = process
2027    /// #   .source_iter(q!(vec![(1, 4), (2, 5)]))
2028    /// #   .batch(&tick, nondet!(/** test */))
2029    /// #   .defer_tick()
2030    /// #   .into_keyed(); // appears on the second tick
2031    /// let changes_across_ticks = // { 1: [2, 3] } (first tick), { 1: [4], 2: [5] } (second tick)
2032    /// # batch_first_tick.chain(batch_second_tick);
2033    /// changes_across_ticks.clone().defer_tick().chain( // from the previous tick
2034    ///     changes_across_ticks // from the current tick
2035    /// )
2036    /// # .entries().all_ticks()
2037    /// # }, |mut stream| async move {
2038    /// // { 1: [2, 3] } (first tick), { 1: [2, 3, 4], 2: [5] } (second tick), { 1: [4], 2: [5] } (third tick)
2039    /// # for w in vec![(1, 2), (1, 3), (1, 2), (1, 3), (1, 4), (2, 5), (1, 4), (2, 5)] {
2040    /// #     assert_eq!(stream.next().await.unwrap(), w);
2041    /// # }
2042    /// # }));
2043    /// ```
2044    pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2045        KeyedStream::new(
2046            self.location.clone(),
2047            HydroNode::DeferTick {
2048                input: Box::new(self.ir_node.into_inner()),
2049                metadata: self.location.new_node_metadata(KeyedStream::<
2050                    K,
2051                    V,
2052                    Tick<L>,
2053                    Bounded,
2054                    O,
2055                    R,
2056                >::collection_kind()),
2057            },
2058        )
2059    }
2060}
2061
2062#[cfg(test)]
2063mod tests {
2064    #[cfg(feature = "deploy")]
2065    use futures::{SinkExt, StreamExt};
2066    #[cfg(feature = "deploy")]
2067    use hydro_deploy::Deployment;
2068    use stageleft::q;
2069
2070    use crate::compile::builder::FlowBuilder;
2071    #[cfg(feature = "deploy")]
2072    use crate::live_collections::stream::ExactlyOnce;
2073    use crate::location::Location;
2074    use crate::nondet::nondet;
2075
2076    #[cfg(feature = "deploy")]
2077    #[tokio::test]
2078    async fn reduce_watermark_filter() {
2079        let mut deployment = Deployment::new();
2080
2081        let flow = FlowBuilder::new();
2082        let node = flow.process::<()>();
2083        let external = flow.external::<()>();
2084
2085        let node_tick = node.tick();
2086        let watermark = node_tick.singleton(q!(1));
2087
2088        let sum = node
2089            .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2090            .into_keyed()
2091            .reduce_watermark(
2092                watermark,
2093                q!(|acc, v| {
2094                    *acc += v;
2095                }),
2096            )
2097            .snapshot(&node_tick, nondet!(/** test */))
2098            .entries()
2099            .all_ticks()
2100            .send_bincode_external(&external);
2101
2102        let nodes = flow
2103            .with_process(&node, deployment.Localhost())
2104            .with_external(&external, deployment.Localhost())
2105            .deploy(&mut deployment);
2106
2107        deployment.deploy().await.unwrap();
2108
2109        let mut out = nodes.connect(sum).await;
2110
2111        deployment.start().await.unwrap();
2112
2113        assert_eq!(out.next().await.unwrap(), (2, 204));
2114    }
2115
2116    #[cfg(feature = "deploy")]
2117    #[tokio::test]
2118    async fn reduce_watermark_garbage_collect() {
2119        let mut deployment = Deployment::new();
2120
2121        let flow = FlowBuilder::new();
2122        let node = flow.process::<()>();
2123        let external = flow.external::<()>();
2124        let (tick_send, tick_trigger) =
2125            node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
2126
2127        let node_tick = node.tick();
2128        let (watermark_complete_cycle, watermark) =
2129            node_tick.cycle_with_initial(node_tick.singleton(q!(1)));
2130        let next_watermark = watermark.clone().map(q!(|v| v + 1));
2131        watermark_complete_cycle.complete_next_tick(next_watermark);
2132
2133        let tick_triggered_input = node
2134            .source_iter(q!([(3, 103)]))
2135            .batch(&node_tick, nondet!(/** test */))
2136            .filter_if_some(
2137                tick_trigger
2138                    .clone()
2139                    .batch(&node_tick, nondet!(/** test */))
2140                    .first(),
2141            )
2142            .all_ticks();
2143
2144        let sum = node
2145            .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2146            .interleave(tick_triggered_input)
2147            .into_keyed()
2148            .reduce_watermark_commutative(
2149                watermark,
2150                q!(|acc, v| {
2151                    *acc += v;
2152                }),
2153            )
2154            .snapshot(&node_tick, nondet!(/** test */))
2155            .entries()
2156            .all_ticks()
2157            .send_bincode_external(&external);
2158
2159        let nodes = flow
2160            .with_default_optimize()
2161            .with_process(&node, deployment.Localhost())
2162            .with_external(&external, deployment.Localhost())
2163            .deploy(&mut deployment);
2164
2165        deployment.deploy().await.unwrap();
2166
2167        let mut tick_send = nodes.connect(tick_send).await;
2168        let mut out_recv = nodes.connect(sum).await;
2169
2170        deployment.start().await.unwrap();
2171
2172        assert_eq!(out_recv.next().await.unwrap(), (2, 204));
2173
2174        tick_send.send(()).await.unwrap();
2175
2176        assert_eq!(out_recv.next().await.unwrap(), (3, 103));
2177    }
2178
2179    #[test]
2180    #[should_panic]
2181    fn sim_batch_nondet_size() {
2182        let flow = FlowBuilder::new();
2183        let external = flow.external::<()>();
2184        let node = flow.process::<()>();
2185
2186        let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2187
2188        let tick = node.tick();
2189        let out_port = input
2190            .batch(&tick, nondet!(/** test */))
2191            .fold(q!(|| vec![]), q!(|acc, v| acc.push(v)))
2192            .entries()
2193            .all_ticks()
2194            .send_bincode_external(&external);
2195
2196        flow.sim().exhaustive(async |mut compiled| {
2197            let out_recv = compiled.connect(&out_port);
2198            compiled.launch();
2199
2200            out_recv
2201                .assert_yields_only_unordered([(1, vec![1, 2])])
2202                .await;
2203        });
2204    }
2205
2206    #[test]
2207    fn sim_batch_preserves_group_order() {
2208        let flow = FlowBuilder::new();
2209        let external = flow.external::<()>();
2210        let node = flow.process::<()>();
2211
2212        let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2213
2214        let tick = node.tick();
2215        let out_port = input
2216            .batch(&tick, nondet!(/** test */))
2217            .all_ticks()
2218            .fold_early_stop(
2219                q!(|| 0),
2220                q!(|acc, v| {
2221                    *acc = std::cmp::max(v, *acc);
2222                    *acc >= 2
2223                }),
2224            )
2225            .entries()
2226            .send_bincode_external(&external);
2227
2228        let instances = flow.sim().exhaustive(async |mut compiled| {
2229            let out_recv = compiled.connect(&out_port);
2230            compiled.launch();
2231
2232            out_recv
2233                .assert_yields_only_unordered([(1, 2), (2, 3)])
2234                .await;
2235        });
2236
2237        assert_eq!(instances, 8);
2238        // - three cases: all three in a separate tick (pick where (2, 3) is)
2239        // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after
2240        // - two cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them
2241        // - one case: all three together
2242    }
2243}