hydro_lang/live_collections/keyed_stream/
mod.rs

1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, Unbounded};
13use super::keyed_singleton::KeyedSingleton;
14use super::optional::Optional;
15use super::stream::{ExactlyOnce, MinOrder, MinRetries, NoOrder, Stream, TotalOrder};
16use crate::compile::ir::{
17    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
18};
19#[cfg(stageleft_runtime)]
20use crate::forward_handle::{CycleCollection, ReceiverComplete};
21use crate::forward_handle::{ForwardRef, TickCycle};
22use crate::live_collections::stream::{Ordering, Retries};
23#[cfg(stageleft_runtime)]
24use crate::location::dynamic::{DynLocation, LocationId};
25use crate::location::tick::DeferTick;
26use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
27use crate::manual_expr::ManualExpr;
28use crate::nondet::{NonDet, nondet};
29
30pub mod networking;
31
32/// Streaming elements of type `V` grouped by a key of type `K`.
33///
34/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
35/// order of keys is non-deterministic but the order *within* each group may be deterministic.
36///
37/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
38/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
39/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
40///
41/// Type Parameters:
42/// - `K`: the type of the key for each group
43/// - `V`: the type of the elements inside each group
44/// - `Loc`: the [`Location`] where the keyed stream is materialized
45/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
46/// - `Order`: tracks whether the elements within each group have deterministic order
47///   ([`TotalOrder`]) or not ([`NoOrder`])
48/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
49///   ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
50pub struct KeyedStream<
51    K,
52    V,
53    Loc,
54    Bound: Boundedness,
55    Order: Ordering = TotalOrder,
56    Retry: Retries = ExactlyOnce,
57> {
58    pub(crate) location: Loc,
59    pub(crate) ir_node: RefCell<HydroNode>,
60
61    _phantom: PhantomData<(K, V, Loc, Bound, Order, Retry)>,
62}
63
64impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
65    for KeyedStream<K, V, L, B, NoOrder, R>
66where
67    L: Location<'a>,
68{
69    fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
70        KeyedStream {
71            location: stream.location,
72            ir_node: stream.ir_node,
73            _phantom: PhantomData,
74        }
75    }
76}
77
78impl<'a, K, V, L, O: Ordering, R: Retries> DeferTick for KeyedStream<K, V, Tick<L>, Bounded, O, R>
79where
80    L: Location<'a>,
81{
82    fn defer_tick(self) -> Self {
83        KeyedStream::defer_tick(self)
84    }
85}
86
87impl<'a, K, V, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
88    for KeyedStream<K, V, Tick<L>, Bounded, O, R>
89where
90    L: Location<'a>,
91{
92    type Location = Tick<L>;
93
94    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
95        KeyedStream {
96            location: location.clone(),
97            ir_node: RefCell::new(HydroNode::CycleSource {
98                ident,
99                metadata: location.new_node_metadata(
100                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
101                ),
102            }),
103            _phantom: PhantomData,
104        }
105    }
106}
107
108impl<'a, K, V, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
109    for KeyedStream<K, V, Tick<L>, Bounded, O, R>
110where
111    L: Location<'a>,
112{
113    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
114        assert_eq!(
115            Location::id(&self.location),
116            expected_location,
117            "locations do not match"
118        );
119
120        self.location
121            .flow_state()
122            .borrow_mut()
123            .push_root(HydroRoot::CycleSink {
124                ident,
125                input: Box::new(self.ir_node.into_inner()),
126                op_metadata: HydroIrOpMetadata::new(),
127            });
128    }
129}
130
131impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
132    for KeyedStream<K, V, L, B, O, R>
133where
134    L: Location<'a> + NoTick,
135{
136    type Location = L;
137
138    fn create_source(ident: syn::Ident, location: L) -> Self {
139        KeyedStream {
140            location: location.clone(),
141            ir_node: RefCell::new(HydroNode::CycleSource {
142                ident,
143                metadata: location
144                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
145            }),
146            _phantom: PhantomData,
147        }
148    }
149}
150
151impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
152    for KeyedStream<K, V, L, B, O, R>
153where
154    L: Location<'a> + NoTick,
155{
156    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
157        assert_eq!(
158            Location::id(&self.location),
159            expected_location,
160            "locations do not match"
161        );
162        self.location
163            .flow_state()
164            .borrow_mut()
165            .push_root(HydroRoot::CycleSink {
166                ident,
167                input: Box::new(self.ir_node.into_inner()),
168                op_metadata: HydroIrOpMetadata::new(),
169            });
170    }
171}
172
173impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
174    Clone for KeyedStream<K, V, Loc, Bound, Order, R>
175{
176    fn clone(&self) -> Self {
177        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
178            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
179            *self.ir_node.borrow_mut() = HydroNode::Tee {
180                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
181                metadata: self.location.new_node_metadata(Self::collection_kind()),
182            };
183        }
184
185        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
186            KeyedStream {
187                location: self.location.clone(),
188                ir_node: HydroNode::Tee {
189                    inner: TeeNode(inner.0.clone()),
190                    metadata: metadata.clone(),
191                }
192                .into(),
193                _phantom: PhantomData,
194            }
195        } else {
196            unreachable!()
197        }
198    }
199}
200
201impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
202    KeyedStream<K, V, L, B, O, R>
203{
204    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
205        debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
206        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
207
208        KeyedStream {
209            location,
210            ir_node: RefCell::new(ir_node),
211            _phantom: PhantomData,
212        }
213    }
214
215    /// Returns the [`CollectionKind`] corresponding to this type.
216    pub fn collection_kind() -> CollectionKind {
217        CollectionKind::KeyedStream {
218            bound: B::BOUND_KIND,
219            value_order: O::ORDERING_KIND,
220            value_retry: R::RETRIES_KIND,
221            key_type: stageleft::quote_type::<K>().into(),
222            value_type: stageleft::quote_type::<V>().into(),
223        }
224    }
225
226    /// Returns the [`Location`] where this keyed stream is being materialized.
227    pub fn location(&self) -> &L {
228        &self.location
229    }
230
231    /// Explicitly "casts" the keyed stream to a type with a different ordering
232    /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
233    /// by the type-system.
234    ///
235    /// # Non-Determinism
236    /// This function is used as an escape hatch, and any mistakes in the
237    /// provided ordering guarantee will propagate into the guarantees
238    /// for the rest of the program.
239    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
240        if O::ORDERING_KIND == O2::ORDERING_KIND {
241            KeyedStream::new(self.location, self.ir_node.into_inner())
242        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
243            // We can always weaken the ordering guarantee
244            KeyedStream::new(
245                self.location.clone(),
246                HydroNode::Cast {
247                    inner: Box::new(self.ir_node.into_inner()),
248                    metadata: self
249                        .location
250                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
251                },
252            )
253        } else {
254            KeyedStream::new(
255                self.location.clone(),
256                HydroNode::ObserveNonDet {
257                    inner: Box::new(self.ir_node.into_inner()),
258                    metadata: self
259                        .location
260                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
261                },
262            )
263        }
264    }
265
266    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
267    /// which is always safe because that is the weakest possible guarantee.
268    pub fn weakest_ordering(self) -> KeyedStream<K, V, L, B, NoOrder, R> {
269        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
270        self.assume_ordering::<NoOrder>(nondet)
271    }
272
273    /// Explicitly "casts" the keyed stream to a type with a different retries
274    /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
275    /// be proven by the type-system.
276    ///
277    /// # Non-Determinism
278    /// This function is used as an escape hatch, and any mistakes in the
279    /// provided retries guarantee will propagate into the guarantees
280    /// for the rest of the program.
281    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
282        if R::RETRIES_KIND == R2::RETRIES_KIND {
283            KeyedStream::new(self.location, self.ir_node.into_inner())
284        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
285            // We can always weaken the retries guarantee
286            KeyedStream::new(
287                self.location.clone(),
288                HydroNode::Cast {
289                    inner: Box::new(self.ir_node.into_inner()),
290                    metadata: self
291                        .location
292                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
293                },
294            )
295        } else {
296            KeyedStream::new(
297                self.location.clone(),
298                HydroNode::ObserveNonDet {
299                    inner: Box::new(self.ir_node.into_inner()),
300                    metadata: self
301                        .location
302                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
303                },
304            )
305        }
306    }
307
308    /// Flattens the keyed stream into an unordered stream of key-value pairs.
309    ///
310    /// # Example
311    /// ```rust
312    /// # use hydro_lang::prelude::*;
313    /// # use futures::StreamExt;
314    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
315    /// process
316    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
317    ///     .into_keyed()
318    ///     .entries()
319    /// # }, |mut stream| async move {
320    /// // (1, 2), (1, 3), (2, 4) in any order
321    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
322    /// #     assert_eq!(stream.next().await.unwrap(), w);
323    /// # }
324    /// # }));
325    /// ```
326    pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
327        Stream::new(
328            self.location.clone(),
329            HydroNode::Cast {
330                inner: Box::new(self.ir_node.into_inner()),
331                metadata: self
332                    .location
333                    .new_node_metadata(Stream::<(K, V), L, B, NoOrder, R>::collection_kind()),
334            },
335        )
336    }
337
338    /// Flattens the keyed stream into an unordered stream of only the values.
339    ///
340    /// # Example
341    /// ```rust
342    /// # use hydro_lang::prelude::*;
343    /// # use futures::StreamExt;
344    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
345    /// process
346    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
347    ///     .into_keyed()
348    ///     .values()
349    /// # }, |mut stream| async move {
350    /// // 2, 3, 4 in any order
351    /// # for w in vec![2, 3, 4] {
352    /// #     assert_eq!(stream.next().await.unwrap(), w);
353    /// # }
354    /// # }));
355    /// ```
356    pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
357        self.entries().map(q!(|(_, v)| v))
358    }
359
360    /// Transforms each value by invoking `f` on each element, with keys staying the same
361    /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
362    ///
363    /// If you do not want to modify the stream and instead only want to view
364    /// each item use [`KeyedStream::inspect`] instead.
365    ///
366    /// # Example
367    /// ```rust
368    /// # use hydro_lang::prelude::*;
369    /// # use futures::StreamExt;
370    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
371    /// process
372    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
373    ///     .into_keyed()
374    ///     .map(q!(|v| v + 1))
375    /// #   .entries()
376    /// # }, |mut stream| async move {
377    /// // { 1: [3, 4], 2: [5] }
378    /// # for w in vec![(1, 3), (1, 4), (2, 5)] {
379    /// #     assert_eq!(stream.next().await.unwrap(), w);
380    /// # }
381    /// # }));
382    /// ```
383    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
384    where
385        F: Fn(V) -> U + 'a,
386    {
387        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
388        let map_f = q!({
389            let orig = f;
390            move |(k, v)| (k, orig(v))
391        })
392        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
393        .into();
394
395        KeyedStream::new(
396            self.location.clone(),
397            HydroNode::Map {
398                f: map_f,
399                input: Box::new(self.ir_node.into_inner()),
400                metadata: self
401                    .location
402                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
403            },
404        )
405    }
406
407    /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
408    /// re-grouped even they are tuples; instead they will be grouped under the original key.
409    ///
410    /// If you do not want to modify the stream and instead only want to view
411    /// each item use [`KeyedStream::inspect_with_key`] instead.
412    ///
413    /// # Example
414    /// ```rust
415    /// # use hydro_lang::prelude::*;
416    /// # use futures::StreamExt;
417    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
418    /// process
419    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
420    ///     .into_keyed()
421    ///     .map_with_key(q!(|(k, v)| k + v))
422    /// #   .entries()
423    /// # }, |mut stream| async move {
424    /// // { 1: [3, 4], 2: [6] }
425    /// # for w in vec![(1, 3), (1, 4), (2, 6)] {
426    /// #     assert_eq!(stream.next().await.unwrap(), w);
427    /// # }
428    /// # }));
429    /// ```
430    pub fn map_with_key<U, F>(
431        self,
432        f: impl IntoQuotedMut<'a, F, L> + Copy,
433    ) -> KeyedStream<K, U, L, B, O, R>
434    where
435        F: Fn((K, V)) -> U + 'a,
436        K: Clone,
437    {
438        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
439        let map_f = q!({
440            let orig = f;
441            move |(k, v)| {
442                let out = orig((Clone::clone(&k), v));
443                (k, out)
444            }
445        })
446        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
447        .into();
448
449        KeyedStream::new(
450            self.location.clone(),
451            HydroNode::Map {
452                f: map_f,
453                input: Box::new(self.ir_node.into_inner()),
454                metadata: self
455                    .location
456                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
457            },
458        )
459    }
460
461    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
462    /// `f`, preserving the order of the elements within the group.
463    ///
464    /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
465    /// not modify or take ownership of the values. If you need to modify the values while filtering
466    /// use [`KeyedStream::filter_map`] instead.
467    ///
468    /// # Example
469    /// ```rust
470    /// # use hydro_lang::prelude::*;
471    /// # use futures::StreamExt;
472    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
473    /// process
474    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
475    ///     .into_keyed()
476    ///     .filter(q!(|&x| x > 2))
477    /// #   .entries()
478    /// # }, |mut stream| async move {
479    /// // { 1: [3], 2: [4] }
480    /// # for w in vec![(1, 3), (2, 4)] {
481    /// #     assert_eq!(stream.next().await.unwrap(), w);
482    /// # }
483    /// # }));
484    /// ```
485    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
486    where
487        F: Fn(&V) -> bool + 'a,
488    {
489        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
490        let filter_f = q!({
491            let orig = f;
492            move |t: &(_, _)| orig(&t.1)
493        })
494        .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
495        .into();
496
497        KeyedStream::new(
498            self.location.clone(),
499            HydroNode::Filter {
500                f: filter_f,
501                input: Box::new(self.ir_node.into_inner()),
502                metadata: self.location.new_node_metadata(Self::collection_kind()),
503            },
504        )
505    }
506
507    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
508    /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
509    ///
510    /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
511    /// not modify or take ownership of the values. If you need to modify the values while filtering
512    /// use [`KeyedStream::filter_map_with_key`] instead.
513    ///
514    /// # Example
515    /// ```rust
516    /// # use hydro_lang::prelude::*;
517    /// # use futures::StreamExt;
518    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
519    /// process
520    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
521    ///     .into_keyed()
522    ///     .filter_with_key(q!(|&(k, v)| v - k == 2))
523    /// #   .entries()
524    /// # }, |mut stream| async move {
525    /// // { 1: [3], 2: [4] }
526    /// # for w in vec![(1, 3), (2, 4)] {
527    /// #     assert_eq!(stream.next().await.unwrap(), w);
528    /// # }
529    /// # }));
530    /// ```
531    pub fn filter_with_key<F>(
532        self,
533        f: impl IntoQuotedMut<'a, F, L> + Copy,
534    ) -> KeyedStream<K, V, L, B, O, R>
535    where
536        F: Fn(&(K, V)) -> bool + 'a,
537    {
538        let filter_f = f
539            .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
540            .into();
541
542        KeyedStream::new(
543            self.location.clone(),
544            HydroNode::Filter {
545                f: filter_f,
546                input: Box::new(self.ir_node.into_inner()),
547                metadata: self.location.new_node_metadata(Self::collection_kind()),
548            },
549        )
550    }
551
552    /// An operator that both filters and maps each value, with keys staying the same.
553    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
554    /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
555    ///
556    /// # Example
557    /// ```rust
558    /// # use hydro_lang::prelude::*;
559    /// # use futures::StreamExt;
560    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
561    /// process
562    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
563    ///     .into_keyed()
564    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
565    /// #   .entries()
566    /// # }, |mut stream| async move {
567    /// // { 1: [2], 2: [4] }
568    /// # for w in vec![(1, 2), (2, 4)] {
569    /// #     assert_eq!(stream.next().await.unwrap(), w);
570    /// # }
571    /// # }));
572    /// ```
573    pub fn filter_map<U, F>(
574        self,
575        f: impl IntoQuotedMut<'a, F, L> + Copy,
576    ) -> KeyedStream<K, U, L, B, O, R>
577    where
578        F: Fn(V) -> Option<U> + 'a,
579    {
580        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
581        let filter_map_f = q!({
582            let orig = f;
583            move |(k, v)| orig(v).map(|o| (k, o))
584        })
585        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
586        .into();
587
588        KeyedStream::new(
589            self.location.clone(),
590            HydroNode::FilterMap {
591                f: filter_map_f,
592                input: Box::new(self.ir_node.into_inner()),
593                metadata: self
594                    .location
595                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
596            },
597        )
598    }
599
600    /// An operator that both filters and maps each key-value pair. The resulting values are **not**
601    /// re-grouped even they are tuples; instead they will be grouped under the original key.
602    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
603    ///
604    /// # Example
605    /// ```rust
606    /// # use hydro_lang::prelude::*;
607    /// # use futures::StreamExt;
608    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
609    /// process
610    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
611    ///     .into_keyed()
612    ///     .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
613    /// #   .entries()
614    /// # }, |mut stream| async move {
615    /// // { 2: [2] }
616    /// # for w in vec![(2, 2)] {
617    /// #     assert_eq!(stream.next().await.unwrap(), w);
618    /// # }
619    /// # }));
620    /// ```
621    pub fn filter_map_with_key<U, F>(
622        self,
623        f: impl IntoQuotedMut<'a, F, L> + Copy,
624    ) -> KeyedStream<K, U, L, B, O, R>
625    where
626        F: Fn((K, V)) -> Option<U> + 'a,
627        K: Clone,
628    {
629        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
630        let filter_map_f = q!({
631            let orig = f;
632            move |(k, v)| {
633                let out = orig((Clone::clone(&k), v));
634                out.map(|o| (k, o))
635            }
636        })
637        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
638        .into();
639
640        KeyedStream::new(
641            self.location.clone(),
642            HydroNode::FilterMap {
643                f: filter_map_f,
644                input: Box::new(self.ir_node.into_inner()),
645                metadata: self
646                    .location
647                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
648            },
649        )
650    }
651
652    /// For each value `v` in each group, transform `v` using `f` and then treat the
653    /// result as an [`Iterator`] to produce values one by one within the same group.
654    /// The implementation for [`Iterator`] for the output type `I` must produce items
655    /// in a **deterministic** order.
656    ///
657    /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
658    /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
659    ///
660    /// # Example
661    /// ```rust
662    /// # use hydro_lang::prelude::*;
663    /// # use futures::StreamExt;
664    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
665    /// process
666    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
667    ///     .into_keyed()
668    ///     .flat_map_ordered(q!(|x| x))
669    /// #   .entries()
670    /// # }, |mut stream| async move {
671    /// // { 1: [2, 3, 4], 2: [5, 6] }
672    /// # for w in vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)] {
673    /// #     assert_eq!(stream.next().await.unwrap(), w);
674    /// # }
675    /// # }));
676    /// ```
677    pub fn flat_map_ordered<U, I, F>(
678        self,
679        f: impl IntoQuotedMut<'a, F, L> + Copy,
680    ) -> KeyedStream<K, U, L, B, O, R>
681    where
682        I: IntoIterator<Item = U>,
683        F: Fn(V) -> I + 'a,
684        K: Clone,
685    {
686        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
687        let flat_map_f = q!({
688            let orig = f;
689            move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
690        })
691        .splice_fn1_ctx::<(K, V), _>(&self.location)
692        .into();
693
694        KeyedStream::new(
695            self.location.clone(),
696            HydroNode::FlatMap {
697                f: flat_map_f,
698                input: Box::new(self.ir_node.into_inner()),
699                metadata: self
700                    .location
701                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
702            },
703        )
704    }
705
706    /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
707    /// for the output type `I` to produce items in any order.
708    ///
709    /// # Example
710    /// ```rust
711    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
712    /// # use futures::StreamExt;
713    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
714    /// process
715    ///     .source_iter(q!(vec![
716    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
717    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
718    ///     ]))
719    ///     .into_keyed()
720    ///     .flat_map_unordered(q!(|x| x))
721    /// #   .entries()
722    /// # }, |mut stream| async move {
723    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
724    /// # let mut results = Vec::new();
725    /// # for _ in 0..4 {
726    /// #     results.push(stream.next().await.unwrap());
727    /// # }
728    /// # results.sort();
729    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
730    /// # }));
731    /// ```
732    pub fn flat_map_unordered<U, I, F>(
733        self,
734        f: impl IntoQuotedMut<'a, F, L> + Copy,
735    ) -> KeyedStream<K, U, L, B, NoOrder, R>
736    where
737        I: IntoIterator<Item = U>,
738        F: Fn(V) -> I + 'a,
739        K: Clone,
740    {
741        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
742        let flat_map_f = q!({
743            let orig = f;
744            move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
745        })
746        .splice_fn1_ctx::<(K, V), _>(&self.location)
747        .into();
748
749        KeyedStream::new(
750            self.location.clone(),
751            HydroNode::FlatMap {
752                f: flat_map_f,
753                input: Box::new(self.ir_node.into_inner()),
754                metadata: self
755                    .location
756                    .new_node_metadata(KeyedStream::<K, U, L, B, NoOrder, R>::collection_kind()),
757            },
758        )
759    }
760
761    /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
762    /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
763    /// items in a **deterministic** order.
764    ///
765    /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
766    /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
767    ///
768    /// # Example
769    /// ```rust
770    /// # use hydro_lang::prelude::*;
771    /// # use futures::StreamExt;
772    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
773    /// process
774    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
775    ///     .into_keyed()
776    ///     .flatten_ordered()
777    /// #   .entries()
778    /// # }, |mut stream| async move {
779    /// // { 1: [2, 3, 4], 2: [5, 6] }
780    /// # for w in vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)] {
781    /// #     assert_eq!(stream.next().await.unwrap(), w);
782    /// # }
783    /// # }));
784    /// ```
785    pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
786    where
787        V: IntoIterator<Item = U>,
788        K: Clone,
789    {
790        self.flat_map_ordered(q!(|d| d))
791    }
792
793    /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
794    /// for the value type `V` to produce items in any order.
795    ///
796    /// # Example
797    /// ```rust
798    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
799    /// # use futures::StreamExt;
800    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
801    /// process
802    ///     .source_iter(q!(vec![
803    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
804    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
805    ///     ]))
806    ///     .into_keyed()
807    ///     .flatten_unordered()
808    /// #   .entries()
809    /// # }, |mut stream| async move {
810    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
811    /// # let mut results = Vec::new();
812    /// # for _ in 0..4 {
813    /// #     results.push(stream.next().await.unwrap());
814    /// # }
815    /// # results.sort();
816    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
817    /// # }));
818    /// ```
819    pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
820    where
821        V: IntoIterator<Item = U>,
822        K: Clone,
823    {
824        self.flat_map_unordered(q!(|d| d))
825    }
826
827    /// An operator which allows you to "inspect" each element of a stream without
828    /// modifying it. The closure `f` is called on a reference to each value. This is
829    /// mainly useful for debugging, and should not be used to generate side-effects.
830    ///
831    /// # Example
832    /// ```rust
833    /// # use hydro_lang::prelude::*;
834    /// # use futures::StreamExt;
835    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
836    /// process
837    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
838    ///     .into_keyed()
839    ///     .inspect(q!(|v| println!("{}", v)))
840    /// #   .entries()
841    /// # }, |mut stream| async move {
842    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
843    /// #     assert_eq!(stream.next().await.unwrap(), w);
844    /// # }
845    /// # }));
846    /// ```
847    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
848    where
849        F: Fn(&V) + 'a,
850    {
851        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
852        let inspect_f = q!({
853            let orig = f;
854            move |t: &(_, _)| orig(&t.1)
855        })
856        .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
857        .into();
858
859        KeyedStream::new(
860            self.location.clone(),
861            HydroNode::Inspect {
862                f: inspect_f,
863                input: Box::new(self.ir_node.into_inner()),
864                metadata: self.location.new_node_metadata(Self::collection_kind()),
865            },
866        )
867    }
868
869    /// An operator which allows you to "inspect" each element of a stream without
870    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
871    /// mainly useful for debugging, and should not be used to generate side-effects.
872    ///
873    /// # Example
874    /// ```rust
875    /// # use hydro_lang::prelude::*;
876    /// # use futures::StreamExt;
877    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
878    /// process
879    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
880    ///     .into_keyed()
881    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
882    /// #   .entries()
883    /// # }, |mut stream| async move {
884    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
885    /// #     assert_eq!(stream.next().await.unwrap(), w);
886    /// # }
887    /// # }));
888    /// ```
889    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
890    where
891        F: Fn(&(K, V)) + 'a,
892    {
893        let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
894
895        KeyedStream::new(
896            self.location.clone(),
897            HydroNode::Inspect {
898                f: inspect_f,
899                input: Box::new(self.ir_node.into_inner()),
900                metadata: self.location.new_node_metadata(Self::collection_kind()),
901            },
902        )
903    }
904
905    /// An operator which allows you to "name" a `HydroNode`.
906    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
907    pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
908        {
909            let mut node = self.ir_node.borrow_mut();
910            let metadata = node.metadata_mut();
911            metadata.tag = Some(name.to_string());
912        }
913        self
914    }
915}
916
917impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries>
918    KeyedStream<K, V, L, Unbounded, O, R>
919{
920    /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
921    /// of any overlapping groups. The result has [`NoOrder`] on each group because the
922    /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
923    /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
924    ///
925    /// Currently, both input streams must be [`Unbounded`].
926    ///
927    /// # Example
928    /// ```rust
929    /// # use hydro_lang::prelude::*;
930    /// # use futures::StreamExt;
931    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
932    /// let numbers1 = process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed();
933    /// let numbers2 = process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed();
934    /// numbers1.interleave(numbers2)
935    /// #   .entries()
936    /// # }, |mut stream| async move {
937    /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
938    /// # for w in vec![(1, 2), (3, 4), (1, 3), (3, 5)] {
939    /// #     assert_eq!(stream.next().await.unwrap(), w);
940    /// # }
941    /// # }));
942    /// ```
943    pub fn interleave<O2: Ordering, R2: Retries>(
944        self,
945        other: KeyedStream<K, V, L, Unbounded, O2, R2>,
946    ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
947    where
948        R: MinRetries<R2>,
949    {
950        let tick = self.location.tick();
951        // Because the outputs are unordered, we can interleave batches from both streams.
952        let nondet_batch_interleaving = nondet!(/** output stream is NoOrder, can interleave */);
953        self.batch(&tick, nondet_batch_interleaving)
954            .weakest_ordering()
955            .chain(
956                other
957                    .batch(&tick, nondet_batch_interleaving)
958                    .weakest_ordering(),
959            )
960            .all_ticks()
961    }
962}
963
964/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
965/// control the processing of future elements.
966pub enum Generate<T> {
967    /// Emit the provided element, and keep processing future inputs.
968    Yield(T),
969    /// Emit the provided element as the _final_ element, do not process future inputs.
970    Return(T),
971    /// Do not emit anything, but continue processing future inputs.
972    Continue,
973    /// Do not emit anything, and do not process further inputs.
974    Break,
975}
976
977impl<'a, K, V, L, B: Boundedness> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
978where
979    K: Eq + Hash,
980    L: Location<'a>,
981{
982    /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
983    ///
984    /// Unlike [`Stream::fold_keyed`] which only returns the final accumulated value, `scan` produces a new stream
985    /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
986    /// early by returning `None`.
987    ///
988    /// The function takes a mutable reference to the accumulator and the current element, and returns
989    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
990    /// If the function returns `None`, the stream is terminated and no more elements are processed.
991    ///
992    /// # Example
993    /// ```rust
994    /// # use hydro_lang::prelude::*;
995    /// # use futures::StreamExt;
996    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
997    /// process
998    ///     .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
999    ///     .into_keyed()
1000    ///     .scan(
1001    ///         q!(|| 0),
1002    ///         q!(|acc, x| {
1003    ///             *acc += x;
1004    ///             if *acc % 2 == 0 { None } else { Some(*acc) }
1005    ///         }),
1006    ///     )
1007    /// #   .entries()
1008    /// # }, |mut stream| async move {
1009    /// // Output: { 0: [1], 1: [3, 7] }
1010    /// # for w in vec![(0, 1), (1, 3), (1, 7)] {
1011    /// #     assert_eq!(stream.next().await.unwrap(), w);
1012    /// # }
1013    /// # }));
1014    /// ```
1015    pub fn scan<A, U, I, F>(
1016        self,
1017        init: impl IntoQuotedMut<'a, I, L> + Copy,
1018        f: impl IntoQuotedMut<'a, F, L> + Copy,
1019    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1020    where
1021        K: Clone,
1022        I: Fn() -> A + 'a,
1023        F: Fn(&mut A, V) -> Option<U> + 'a,
1024    {
1025        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1026        self.generator(
1027            init,
1028            q!({
1029                let orig = f;
1030                move |state, v| {
1031                    if let Some(out) = orig(state, v) {
1032                        Generate::Yield(out)
1033                    } else {
1034                        Generate::Break
1035                    }
1036                }
1037            }),
1038        )
1039    }
1040
1041    /// Iteratively processes the elements in each group using a state machine that can yield
1042    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1043    /// syntax in Rust, without requiring special syntax.
1044    ///
1045    /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
1046    /// state for each group. The second argument defines the processing logic, taking in a
1047    /// mutable reference to the group's state and the value to be processed. It emits a
1048    /// [`Generate`] value, whose variants define what is emitted and whether further inputs
1049    /// should be processed.
1050    ///
1051    /// # Example
1052    /// ```rust
1053    /// # use hydro_lang::prelude::*;
1054    /// # use futures::StreamExt;
1055    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1056    /// process
1057    ///     .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
1058    ///     .into_keyed()
1059    ///     .generator(
1060    ///         q!(|| 0),
1061    ///         q!(|acc, x| {
1062    ///             *acc += x;
1063    ///             if *acc > 100 {
1064    ///                 hydro_lang::live_collections::keyed_stream::Generate::Return(
1065    ///                     "done!".to_string()
1066    ///                 )
1067    ///             } else if *acc % 2 == 0 {
1068    ///                 hydro_lang::live_collections::keyed_stream::Generate::Yield(
1069    ///                     "even".to_string()
1070    ///                 )
1071    ///             } else {
1072    ///                 hydro_lang::live_collections::keyed_stream::Generate::Continue
1073    ///             }
1074    ///         }),
1075    ///     )
1076    /// #   .entries()
1077    /// # }, |mut stream| async move {
1078    /// // Output: { 0: ["even", "done!"], 1: ["even"] }
1079    /// # for w in vec![(0, "even".to_string()), (0, "done!".to_string()), (1, "even".to_string())] {
1080    /// #     assert_eq!(stream.next().await.unwrap(), w);
1081    /// # }
1082    /// # }));
1083    /// ```
1084    pub fn generator<A, U, I, F>(
1085        self,
1086        init: impl IntoQuotedMut<'a, I, L> + Copy,
1087        f: impl IntoQuotedMut<'a, F, L> + Copy,
1088    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1089    where
1090        K: Clone,
1091        I: Fn() -> A + 'a,
1092        F: Fn(&mut A, V) -> Generate<U> + 'a,
1093    {
1094        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1095        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1096
1097        let scan_init = q!(|| HashMap::new())
1098            .splice_fn0_ctx::<HashMap<K, Option<A>>>(&self.location)
1099            .into();
1100        let scan_f = q!(move |acc: &mut HashMap<_, _>, (k, v)| {
1101            let existing_state = acc.entry(Clone::clone(&k)).or_insert_with(|| Some(init()));
1102            if let Some(existing_state_value) = existing_state {
1103                match f(existing_state_value, v) {
1104                    Generate::Yield(out) => Some(Some((k, out))),
1105                    Generate::Return(out) => {
1106                        let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1107                        Some(Some((k, out)))
1108                    }
1109                    Generate::Break => {
1110                        let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1111                        Some(None)
1112                    }
1113                    Generate::Continue => Some(None),
1114                }
1115            } else {
1116                Some(None)
1117            }
1118        })
1119        .splice_fn2_borrow_mut_ctx::<HashMap<K, Option<A>>, (K, V), _>(&self.location)
1120        .into();
1121
1122        let scan_node = HydroNode::Scan {
1123            init: scan_init,
1124            acc: scan_f,
1125            input: Box::new(self.ir_node.into_inner()),
1126            metadata: self.location.new_node_metadata(Stream::<
1127                Option<(K, U)>,
1128                L,
1129                B,
1130                TotalOrder,
1131                ExactlyOnce,
1132            >::collection_kind()),
1133        };
1134
1135        let flatten_f = q!(|d| d)
1136            .splice_fn1_ctx::<Option<(K, U)>, _>(&self.location)
1137            .into();
1138        let flatten_node = HydroNode::FlatMap {
1139            f: flatten_f,
1140            input: Box::new(scan_node),
1141            metadata: self.location.new_node_metadata(KeyedStream::<
1142                K,
1143                U,
1144                L,
1145                B,
1146                TotalOrder,
1147                ExactlyOnce,
1148            >::collection_kind()),
1149        };
1150
1151        KeyedStream::new(self.location.clone(), flatten_node)
1152    }
1153
1154    /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
1155    /// in-order across the values in each group. But the aggregation function returns a boolean,
1156    /// which when true indicates that the aggregated result is complete and can be released to
1157    /// downstream computation. Unlike [`Stream::fold_keyed`], this means that even if the input
1158    /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
1159    /// normal stream elements.
1160    ///
1161    /// # Example
1162    /// ```rust
1163    /// # use hydro_lang::prelude::*;
1164    /// # use futures::StreamExt;
1165    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1166    /// process
1167    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1168    ///     .into_keyed()
1169    ///     .fold_early_stop(
1170    ///         q!(|| 0),
1171    ///         q!(|acc, x| {
1172    ///             *acc += x;
1173    ///             x % 2 == 0
1174    ///         }),
1175    ///     )
1176    /// #   .entries()
1177    /// # }, |mut stream| async move {
1178    /// // Output: { 0: 2, 1: 9 }
1179    /// # for w in vec![(0, 2), (1, 9)] {
1180    /// #     assert_eq!(stream.next().await.unwrap(), w);
1181    /// # }
1182    /// # }));
1183    /// ```
1184    pub fn fold_early_stop<A, I, F>(
1185        self,
1186        init: impl IntoQuotedMut<'a, I, L> + Copy,
1187        f: impl IntoQuotedMut<'a, F, L> + Copy,
1188    ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
1189    where
1190        K: Clone,
1191        I: Fn() -> A + 'a,
1192        F: Fn(&mut A, V) -> bool + 'a,
1193    {
1194        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1195        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1196        let out_without_bound_cast = self.generator(
1197            q!(move || Some(init())),
1198            q!(move |key_state, v| {
1199                if let Some(key_state_value) = key_state.as_mut() {
1200                    if f(key_state_value, v) {
1201                        Generate::Return(key_state.take().unwrap())
1202                    } else {
1203                        Generate::Continue
1204                    }
1205                } else {
1206                    unreachable!()
1207                }
1208            }),
1209        );
1210
1211        KeyedSingleton::new(
1212            out_without_bound_cast.location.clone(),
1213            HydroNode::Cast {
1214                inner: Box::new(out_without_bound_cast.ir_node.into_inner()),
1215                metadata: out_without_bound_cast
1216                    .location
1217                    .new_node_metadata(
1218                        KeyedSingleton::<K, A, L, B::WhenValueBounded>::collection_kind(),
1219                    ),
1220            },
1221        )
1222    }
1223
1224    /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
1225    /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
1226    /// otherwise the first element would be non-deterministic.
1227    ///
1228    /// # Example
1229    /// ```rust
1230    /// # use hydro_lang::prelude::*;
1231    /// # use futures::StreamExt;
1232    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1233    /// process
1234    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1235    ///     .into_keyed()
1236    ///     .first()
1237    /// #   .entries()
1238    /// # }, |mut stream| async move {
1239    /// // Output: { 0: 2, 1: 3 }
1240    /// # for w in vec![(0, 2), (1, 3)] {
1241    /// #     assert_eq!(stream.next().await.unwrap(), w);
1242    /// # }
1243    /// # }));
1244    /// ```
1245    pub fn first(self) -> KeyedSingleton<K, V, L, B::WhenValueBounded>
1246    where
1247        K: Clone,
1248    {
1249        self.fold_early_stop(
1250            q!(|| None),
1251            q!(|acc, v| {
1252                *acc = Some(v);
1253                true
1254            }),
1255        )
1256        .map(q!(|v| v.unwrap()))
1257    }
1258
1259    /// Like [`Stream::fold`], aggregates the values in each group via the `comb` closure.
1260    ///
1261    /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1262    /// to depend on the order of elements in the group.
1263    ///
1264    /// If the input and output value types are the same and do not require initialization then use
1265    /// [`KeyedStream::reduce`].
1266    ///
1267    /// # Example
1268    /// ```rust
1269    /// # use hydro_lang::prelude::*;
1270    /// # use futures::StreamExt;
1271    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1272    /// let tick = process.tick();
1273    /// let numbers = process
1274    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1275    ///     .into_keyed();
1276    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1277    /// batch
1278    ///     .fold(q!(|| 0), q!(|acc, x| *acc += x))
1279    ///     .entries()
1280    ///     .all_ticks()
1281    /// # }, |mut stream| async move {
1282    /// // (1, 5), (2, 7)
1283    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1284    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1285    /// # }));
1286    /// ```
1287    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1288        self,
1289        init: impl IntoQuotedMut<'a, I, L>,
1290        comb: impl IntoQuotedMut<'a, F, L>,
1291    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1292        let init = init.splice_fn0_ctx(&self.location).into();
1293        let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1294
1295        KeyedSingleton::new(
1296            self.location.clone(),
1297            HydroNode::FoldKeyed {
1298                init,
1299                acc: comb,
1300                input: Box::new(self.ir_node.into_inner()),
1301                metadata: self.location.new_node_metadata(KeyedSingleton::<
1302                    K,
1303                    A,
1304                    L,
1305                    B::WhenValueUnbounded,
1306                >::collection_kind()),
1307            },
1308        )
1309    }
1310
1311    /// Like [`Stream::reduce`], aggregates the values in each group via the `comb` closure.
1312    ///
1313    /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1314    /// to depend on the order of elements in the stream.
1315    ///
1316    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1317    ///
1318    /// # Example
1319    /// ```rust
1320    /// # use hydro_lang::prelude::*;
1321    /// # use futures::StreamExt;
1322    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1323    /// let tick = process.tick();
1324    /// let numbers = process
1325    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1326    ///     .into_keyed();
1327    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1328    /// batch.reduce(q!(|acc, x| *acc += x)).entries().all_ticks()
1329    /// # }, |mut stream| async move {
1330    /// // (1, 5), (2, 7)
1331    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1332    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1333    /// # }));
1334    /// ```
1335    pub fn reduce<F: Fn(&mut V, V) + 'a>(
1336        self,
1337        comb: impl IntoQuotedMut<'a, F, L>,
1338    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1339        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1340
1341        KeyedSingleton::new(
1342            self.location.clone(),
1343            HydroNode::ReduceKeyed {
1344                f,
1345                input: Box::new(self.ir_node.into_inner()),
1346                metadata: self.location.new_node_metadata(KeyedSingleton::<
1347                    K,
1348                    V,
1349                    L,
1350                    B::WhenValueUnbounded,
1351                >::collection_kind()),
1352            },
1353        )
1354    }
1355
1356    /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark are automatically deleted.
1357    ///
1358    /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1359    /// to depend on the order of elements in the stream.
1360    ///
1361    /// # Example
1362    /// ```rust
1363    /// # use hydro_lang::prelude::*;
1364    /// # use futures::StreamExt;
1365    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1366    /// let tick = process.tick();
1367    /// let watermark = tick.singleton(q!(1));
1368    /// let numbers = process
1369    ///     .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1370    ///     .into_keyed();
1371    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1372    /// batch
1373    ///     .reduce_watermark(watermark, q!(|acc, x| *acc += x))
1374    ///     .entries()
1375    ///     .all_ticks()
1376    /// # }, |mut stream| async move {
1377    /// // (2, 204)
1378    /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
1379    /// # }));
1380    /// ```
1381    pub fn reduce_watermark<O, F>(
1382        self,
1383        other: impl Into<Optional<O, Tick<L::Root>, Bounded>>,
1384        comb: impl IntoQuotedMut<'a, F, L>,
1385    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1386    where
1387        O: Clone,
1388        F: Fn(&mut V, V) + 'a,
1389    {
1390        let other: Optional<O, Tick<L::Root>, Bounded> = other.into();
1391        check_matching_location(&self.location.root(), other.location.outer());
1392        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1393
1394        KeyedSingleton::new(
1395            self.location.clone(),
1396            HydroNode::ReduceKeyedWatermark {
1397                f,
1398                input: Box::new(self.ir_node.into_inner()),
1399                watermark: Box::new(other.ir_node.into_inner()),
1400                metadata: self.location.new_node_metadata(KeyedSingleton::<
1401                    K,
1402                    V,
1403                    L,
1404                    B::WhenValueUnbounded,
1405                >::collection_kind()),
1406            },
1407        )
1408    }
1409}
1410
1411impl<'a, K, V, L, B: Boundedness, O: Ordering> KeyedStream<K, V, L, B, O, ExactlyOnce>
1412where
1413    K: Eq + Hash,
1414    L: Location<'a>,
1415{
1416    /// Like [`Stream::fold_commutative`], aggregates the values in each group via the `comb` closure.
1417    ///
1418    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1419    ///
1420    /// If the input and output value types are the same and do not require initialization then use
1421    /// [`KeyedStream::reduce_commutative`].
1422    ///
1423    /// # Example
1424    /// ```rust
1425    /// # use hydro_lang::prelude::*;
1426    /// # use futures::StreamExt;
1427    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1428    /// let tick = process.tick();
1429    /// let numbers = process
1430    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1431    ///     .into_keyed();
1432    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1433    /// batch
1434    ///     .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1435    ///     .entries()
1436    ///     .all_ticks()
1437    /// # }, |mut stream| async move {
1438    /// // (1, 5), (2, 7)
1439    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1440    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1441    /// # }));
1442    /// ```
1443    pub fn fold_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1444        self,
1445        init: impl IntoQuotedMut<'a, I, L>,
1446        comb: impl IntoQuotedMut<'a, F, L>,
1447    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1448        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1449            .fold(init, comb)
1450    }
1451
1452    /// Like [`Stream::reduce_commutative`], aggregates the values in each group via the `comb` closure.
1453    ///
1454    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1455    ///
1456    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative`].
1457    ///
1458    /// # Example
1459    /// ```rust
1460    /// # use hydro_lang::prelude::*;
1461    /// # use futures::StreamExt;
1462    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1463    /// let tick = process.tick();
1464    /// let numbers = process
1465    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1466    ///     .into_keyed();
1467    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1468    /// batch
1469    ///     .reduce_commutative(q!(|acc, x| *acc += x))
1470    ///     .entries()
1471    ///     .all_ticks()
1472    /// # }, |mut stream| async move {
1473    /// // (1, 5), (2, 7)
1474    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1475    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1476    /// # }));
1477    /// ```
1478    pub fn reduce_commutative<F: Fn(&mut V, V) + 'a>(
1479        self,
1480        comb: impl IntoQuotedMut<'a, F, L>,
1481    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1482        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1483            .reduce(comb)
1484    }
1485
1486    /// A special case of [`KeyedStream::reduce_commutative`] where tuples with keys less than the watermark are automatically deleted.
1487    ///
1488    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1489    ///
1490    /// # Example
1491    /// ```rust
1492    /// # use hydro_lang::prelude::*;
1493    /// # use futures::StreamExt;
1494    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1495    /// let tick = process.tick();
1496    /// let watermark = tick.singleton(q!(1));
1497    /// let numbers = process
1498    ///     .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1499    ///     .into_keyed();
1500    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1501    /// batch
1502    ///     .reduce_watermark_commutative(watermark, q!(|acc, x| *acc += x))
1503    ///     .entries()
1504    ///     .all_ticks()
1505    /// # }, |mut stream| async move {
1506    /// // (2, 204)
1507    /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
1508    /// # }));
1509    /// ```
1510    pub fn reduce_watermark_commutative<O2, F>(
1511        self,
1512        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1513        comb: impl IntoQuotedMut<'a, F, L>,
1514    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1515    where
1516        O2: Clone,
1517        F: Fn(&mut V, V) + 'a,
1518    {
1519        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1520            .reduce_watermark(other, comb)
1521    }
1522}
1523
1524impl<'a, K, V, L, B: Boundedness, R: Retries> KeyedStream<K, V, L, B, TotalOrder, R>
1525where
1526    K: Eq + Hash,
1527    L: Location<'a>,
1528{
1529    /// Like [`Stream::fold_idempotent`], aggregates the values in each group via the `comb` closure.
1530    ///
1531    /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
1532    ///
1533    /// If the input and output value types are the same and do not require initialization then use
1534    /// [`KeyedStream::reduce_idempotent`].
1535    ///
1536    /// # Example
1537    /// ```rust
1538    /// # use hydro_lang::prelude::*;
1539    /// # use futures::StreamExt;
1540    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1541    /// let tick = process.tick();
1542    /// let numbers = process
1543    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1544    ///     .into_keyed();
1545    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1546    /// batch
1547    ///     .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1548    ///     .entries()
1549    ///     .all_ticks()
1550    /// # }, |mut stream| async move {
1551    /// // (1, false), (2, true)
1552    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1553    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1554    /// # }));
1555    /// ```
1556    pub fn fold_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1557        self,
1558        init: impl IntoQuotedMut<'a, I, L>,
1559        comb: impl IntoQuotedMut<'a, F, L>,
1560    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1561        self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1562            .fold(init, comb)
1563    }
1564
1565    /// Like [`Stream::reduce_idempotent`], aggregates the values in each group via the `comb` closure.
1566    ///
1567    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1568    ///
1569    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_idempotent`].
1570    ///
1571    /// # Example
1572    /// ```rust
1573    /// # use hydro_lang::prelude::*;
1574    /// # use futures::StreamExt;
1575    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1576    /// let tick = process.tick();
1577    /// let numbers = process
1578    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1579    ///     .into_keyed();
1580    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1581    /// batch
1582    ///     .reduce_idempotent(q!(|acc, x| *acc |= x))
1583    ///     .entries()
1584    ///     .all_ticks()
1585    /// # }, |mut stream| async move {
1586    /// // (1, false), (2, true)
1587    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1588    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1589    /// # }));
1590    /// ```
1591    pub fn reduce_idempotent<F: Fn(&mut V, V) + 'a>(
1592        self,
1593        comb: impl IntoQuotedMut<'a, F, L>,
1594    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1595        self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1596            .reduce(comb)
1597    }
1598
1599    /// A special case of [`KeyedStream::reduce_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1600    ///
1601    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1602    ///
1603    /// # Example
1604    /// ```rust
1605    /// # use hydro_lang::prelude::*;
1606    /// # use futures::StreamExt;
1607    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1608    /// let tick = process.tick();
1609    /// let watermark = tick.singleton(q!(1));
1610    /// let numbers = process
1611    ///     .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1612    ///     .into_keyed();
1613    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1614    /// batch
1615    ///     .reduce_watermark_idempotent(watermark, q!(|acc, x| *acc |= x))
1616    ///     .entries()
1617    ///     .all_ticks()
1618    /// # }, |mut stream| async move {
1619    /// // (2, true)
1620    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1621    /// # }));
1622    /// ```
1623    pub fn reduce_watermark_idempotent<O2, F>(
1624        self,
1625        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1626        comb: impl IntoQuotedMut<'a, F, L>,
1627    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1628    where
1629        O2: Clone,
1630        F: Fn(&mut V, V) + 'a,
1631    {
1632        self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1633            .reduce_watermark(other, comb)
1634    }
1635}
1636
1637impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1638where
1639    K: Eq + Hash,
1640    L: Location<'a>,
1641{
1642    /// Like [`Stream::fold_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1643    ///
1644    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1645    /// as there may be non-deterministic duplicates.
1646    ///
1647    /// If the input and output value types are the same and do not require initialization then use
1648    /// [`KeyedStream::reduce_commutative_idempotent`].
1649    ///
1650    /// # Example
1651    /// ```rust
1652    /// # use hydro_lang::prelude::*;
1653    /// # use futures::StreamExt;
1654    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1655    /// let tick = process.tick();
1656    /// let numbers = process
1657    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1658    ///     .into_keyed();
1659    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1660    /// batch
1661    ///     .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1662    ///     .entries()
1663    ///     .all_ticks()
1664    /// # }, |mut stream| async move {
1665    /// // (1, false), (2, true)
1666    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1667    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1668    /// # }));
1669    /// ```
1670    pub fn fold_commutative_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1671        self,
1672        init: impl IntoQuotedMut<'a, I, L>,
1673        comb: impl IntoQuotedMut<'a, F, L>,
1674    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1675        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1676            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1677            .fold(init, comb)
1678    }
1679
1680    /// Like [`Stream::reduce_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1681    ///
1682    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1683    /// as there may be non-deterministic duplicates.
1684    ///
1685    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative_idempotent`].
1686    ///
1687    /// # Example
1688    /// ```rust
1689    /// # use hydro_lang::prelude::*;
1690    /// # use futures::StreamExt;
1691    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1692    /// let tick = process.tick();
1693    /// let numbers = process
1694    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1695    ///     .into_keyed();
1696    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1697    /// batch
1698    ///     .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1699    ///     .entries()
1700    ///     .all_ticks()
1701    /// # }, |mut stream| async move {
1702    /// // (1, false), (2, true)
1703    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1704    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1705    /// # }));
1706    /// ```
1707    pub fn reduce_commutative_idempotent<F: Fn(&mut V, V) + 'a>(
1708        self,
1709        comb: impl IntoQuotedMut<'a, F, L>,
1710    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1711        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1712            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1713            .reduce(comb)
1714    }
1715
1716    /// A special case of [`Stream::reduce_keyed_commutative_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1717    ///
1718    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1719    /// as there may be non-deterministic duplicates.
1720    ///
1721    /// # Example
1722    /// ```rust
1723    /// # use hydro_lang::prelude::*;
1724    /// # use futures::StreamExt;
1725    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1726    /// let tick = process.tick();
1727    /// let watermark = tick.singleton(q!(1));
1728    /// let numbers = process
1729    ///     .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1730    ///     .into_keyed();
1731    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1732    /// batch
1733    ///     .reduce_watermark_commutative_idempotent(watermark, q!(|acc, x| *acc |= x))
1734    ///     .entries()
1735    ///     .all_ticks()
1736    /// # }, |mut stream| async move {
1737    /// // (2, true)
1738    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1739    /// # }));
1740    /// ```
1741    pub fn reduce_watermark_commutative_idempotent<O2, F>(
1742        self,
1743        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1744        comb: impl IntoQuotedMut<'a, F, L>,
1745    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1746    where
1747        O2: Clone,
1748        F: Fn(&mut V, V) + 'a,
1749    {
1750        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1751            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1752            .reduce_watermark(other, comb)
1753    }
1754
1755    /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1756    /// whose keys are not in the bounded stream.
1757    ///
1758    /// # Example
1759    /// ```rust
1760    /// # use hydro_lang::prelude::*;
1761    /// # use futures::StreamExt;
1762    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1763    /// let tick = process.tick();
1764    /// let keyed_stream = process
1765    ///     .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1766    ///     .batch(&tick, nondet!(/** test */))
1767    ///     .into_keyed();
1768    /// let keys_to_remove = process
1769    ///     .source_iter(q!(vec![1, 2]))
1770    ///     .batch(&tick, nondet!(/** test */));
1771    /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1772    /// #   .entries()
1773    /// # }, |mut stream| async move {
1774    /// // { 3: ['c'], 4: ['d'] }
1775    /// # for w in vec![(3, 'c'), (4, 'd')] {
1776    /// #     assert_eq!(stream.next().await.unwrap(), w);
1777    /// # }
1778    /// # }));
1779    /// ```
1780    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1781        self,
1782        other: Stream<K, L, Bounded, O2, R2>,
1783    ) -> Self {
1784        check_matching_location(&self.location, &other.location);
1785
1786        KeyedStream::new(
1787            self.location.clone(),
1788            HydroNode::AntiJoin {
1789                pos: Box::new(self.ir_node.into_inner()),
1790                neg: Box::new(other.ir_node.into_inner()),
1791                metadata: self.location.new_node_metadata(Self::collection_kind()),
1792            },
1793        )
1794    }
1795}
1796
1797impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1798where
1799    L: Location<'a>,
1800{
1801    /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
1802    /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
1803    ///
1804    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1805    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1806    /// argument that declares where the stream will be atomically processed. Batching a stream into
1807    /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
1808    /// [`Tick`] will introduce asynchrony.
1809    pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
1810        let out_location = Atomic { tick: tick.clone() };
1811        KeyedStream::new(
1812            out_location.clone(),
1813            HydroNode::BeginAtomic {
1814                inner: Box::new(self.ir_node.into_inner()),
1815                metadata: out_location
1816                    .new_node_metadata(KeyedStream::<K, V, Atomic<L>, B, O, R>::collection_kind()),
1817            },
1818        )
1819    }
1820
1821    /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
1822    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1823    /// the order of the input.
1824    ///
1825    /// # Non-Determinism
1826    /// The batch boundaries are non-deterministic and may change across executions.
1827    pub fn batch(
1828        self,
1829        tick: &Tick<L>,
1830        nondet: NonDet,
1831    ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1832        let _ = nondet;
1833        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1834        KeyedStream::new(
1835            tick.clone(),
1836            HydroNode::Batch {
1837                inner: Box::new(self.ir_node.into_inner()),
1838                metadata: tick.new_node_metadata(
1839                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
1840                ),
1841            },
1842        )
1843    }
1844}
1845
1846impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
1847where
1848    L: Location<'a> + NoTick,
1849{
1850    /// Returns a keyed stream corresponding to the latest batch of elements being atomically
1851    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
1852    /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
1853    /// used to create the atomic section.
1854    ///
1855    /// # Non-Determinism
1856    /// The batch boundaries are non-deterministic and may change across executions.
1857    pub fn batch_atomic(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1858        let _ = nondet;
1859        KeyedStream::new(
1860            self.location.clone().tick,
1861            HydroNode::Batch {
1862                inner: Box::new(self.ir_node.into_inner()),
1863                metadata: self.location.tick.new_node_metadata(KeyedStream::<
1864                    K,
1865                    V,
1866                    Tick<L>,
1867                    Bounded,
1868                    O,
1869                    R,
1870                >::collection_kind(
1871                )),
1872            },
1873        )
1874    }
1875
1876    /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
1877    /// See [`KeyedStream::atomic`] for more details.
1878    pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
1879        KeyedStream::new(
1880            self.location.tick.l.clone(),
1881            HydroNode::EndAtomic {
1882                inner: Box::new(self.ir_node.into_inner()),
1883                metadata: self
1884                    .location
1885                    .tick
1886                    .l
1887                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
1888            },
1889        )
1890    }
1891}
1892
1893impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, L, Bounded, O, R>
1894where
1895    L: Location<'a>,
1896{
1897    /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
1898    /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
1899    /// is only present in one of the inputs, its values are passed through as-is). The output has
1900    /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
1901    ///
1902    /// Currently, both input streams must be [`Bounded`]. This operator will block
1903    /// on the first stream until all its elements are available. In a future version,
1904    /// we will relax the requirement on the `other` stream.
1905    ///
1906    /// # Example
1907    /// ```rust
1908    /// # use hydro_lang::prelude::*;
1909    /// # use futures::StreamExt;
1910    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1911    /// let tick = process.tick();
1912    /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
1913    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1914    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1915    /// # .entries()
1916    /// # }, |mut stream| async move {
1917    /// // { 0: [2, 1], 1: [4, 3] }
1918    /// # for w in vec![(0, 2), (1, 4), (0, 1), (1, 3)] {
1919    /// #     assert_eq!(stream.next().await.unwrap(), w);
1920    /// # }
1921    /// # }));
1922    /// ```
1923    pub fn chain<O2: Ordering, R2: Retries>(
1924        self,
1925        other: KeyedStream<K, V, L, Bounded, O2, R2>,
1926    ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1927    where
1928        O: MinOrder<O2>,
1929        R: MinRetries<R2>,
1930    {
1931        check_matching_location(&self.location, &other.location);
1932
1933        KeyedStream::new(
1934            self.location.clone(),
1935            HydroNode::Chain {
1936                first: Box::new(self.ir_node.into_inner()),
1937                second: Box::new(other.ir_node.into_inner()),
1938                metadata: self.location.new_node_metadata(KeyedStream::<
1939                    K,
1940                    V,
1941                    L,
1942                    Bounded,
1943                    <O as MinOrder<O2>>::Min,
1944                    <R as MinRetries<R2>>::Min,
1945                >::collection_kind()),
1946            },
1947        )
1948    }
1949}
1950
1951impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
1952where
1953    L: Location<'a>,
1954{
1955    /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
1956    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
1957    /// each key.
1958    pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
1959        KeyedStream::new(
1960            self.location.outer().clone(),
1961            HydroNode::YieldConcat {
1962                inner: Box::new(self.ir_node.into_inner()),
1963                metadata: self.location.outer().new_node_metadata(KeyedStream::<
1964                    K,
1965                    V,
1966                    L,
1967                    Unbounded,
1968                    O,
1969                    R,
1970                >::collection_kind(
1971                )),
1972            },
1973        )
1974    }
1975
1976    /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
1977    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
1978    /// each key.
1979    ///
1980    /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
1981    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1982    /// stream's [`Tick`] context.
1983    pub fn all_ticks_atomic(self) -> KeyedStream<K, V, Atomic<L>, Unbounded, O, R> {
1984        let out_location = Atomic {
1985            tick: self.location.clone(),
1986        };
1987
1988        KeyedStream::new(
1989            out_location.clone(),
1990            HydroNode::YieldConcat {
1991                inner: Box::new(self.ir_node.into_inner()),
1992                metadata: out_location.new_node_metadata(KeyedStream::<
1993                    K,
1994                    V,
1995                    Atomic<L>,
1996                    Unbounded,
1997                    O,
1998                    R,
1999                >::collection_kind()),
2000            },
2001        )
2002    }
2003
2004    /// Shifts the entries in `self` to the **next tick**, so that the returned keyed stream at
2005    /// tick `T` always has the entries of `self` at tick `T - 1`.
2006    ///
2007    /// At tick `0`, the output keyed stream is empty, since there is no previous tick.
2008    ///
2009    /// This operator enables stateful iterative processing with ticks, by sending data from one
2010    /// tick to the next. For example, you can use it to combine inputs across consecutive batches.
2011    ///
2012    /// # Example
2013    /// ```rust
2014    /// # use hydro_lang::prelude::*;
2015    /// # use futures::StreamExt;
2016    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2017    /// let tick = process.tick();
2018    /// # // ticks are lazy by default, forces the second tick to run
2019    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2020    /// # let batch_first_tick = process
2021    /// #   .source_iter(q!(vec![(1, 2), (1, 3)]))
2022    /// #   .batch(&tick, nondet!(/** test */))
2023    /// #   .into_keyed();
2024    /// # let batch_second_tick = process
2025    /// #   .source_iter(q!(vec![(1, 4), (2, 5)]))
2026    /// #   .batch(&tick, nondet!(/** test */))
2027    /// #   .defer_tick()
2028    /// #   .into_keyed(); // appears on the second tick
2029    /// let changes_across_ticks = // { 1: [2, 3] } (first tick), { 1: [4], 2: [5] } (second tick)
2030    /// # batch_first_tick.chain(batch_second_tick);
2031    /// changes_across_ticks.clone().defer_tick().chain( // from the previous tick
2032    ///     changes_across_ticks // from the current tick
2033    /// )
2034    /// # .entries().all_ticks()
2035    /// # }, |mut stream| async move {
2036    /// // { 1: [2, 3] } (first tick), { 1: [2, 3, 4], 2: [5] } (second tick), { 1: [4], 2: [5] } (third tick)
2037    /// # for w in vec![(1, 2), (1, 3), (1, 2), (1, 3), (1, 4), (2, 5), (1, 4), (2, 5)] {
2038    /// #     assert_eq!(stream.next().await.unwrap(), w);
2039    /// # }
2040    /// # }));
2041    /// ```
2042    pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2043        KeyedStream::new(
2044            self.location.clone(),
2045            HydroNode::DeferTick {
2046                input: Box::new(self.ir_node.into_inner()),
2047                metadata: self.location.new_node_metadata(KeyedStream::<
2048                    K,
2049                    V,
2050                    Tick<L>,
2051                    Bounded,
2052                    O,
2053                    R,
2054                >::collection_kind()),
2055            },
2056        )
2057    }
2058}
2059
2060#[cfg(test)]
2061mod tests {
2062    use futures::{SinkExt, StreamExt};
2063    use hydro_deploy::Deployment;
2064    use stageleft::q;
2065
2066    use crate::compile::builder::FlowBuilder;
2067    use crate::live_collections::stream::ExactlyOnce;
2068    use crate::location::Location;
2069    use crate::nondet::nondet;
2070
2071    #[tokio::test]
2072    async fn reduce_watermark_filter() {
2073        let mut deployment = Deployment::new();
2074
2075        let flow = FlowBuilder::new();
2076        let node = flow.process::<()>();
2077        let external = flow.external::<()>();
2078
2079        let node_tick = node.tick();
2080        let watermark = node_tick.singleton(q!(1));
2081
2082        let sum = node
2083            .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2084            .into_keyed()
2085            .reduce_watermark(
2086                watermark,
2087                q!(|acc, v| {
2088                    *acc += v;
2089                }),
2090            )
2091            .snapshot(&node_tick, nondet!(/** test */))
2092            .entries()
2093            .all_ticks()
2094            .send_bincode_external(&external);
2095
2096        let nodes = flow
2097            .with_process(&node, deployment.Localhost())
2098            .with_external(&external, deployment.Localhost())
2099            .deploy(&mut deployment);
2100
2101        deployment.deploy().await.unwrap();
2102
2103        let mut out = nodes.connect(sum).await;
2104
2105        deployment.start().await.unwrap();
2106
2107        assert_eq!(out.next().await.unwrap(), (2, 204));
2108    }
2109
2110    #[tokio::test]
2111    async fn reduce_watermark_garbage_collect() {
2112        let mut deployment = Deployment::new();
2113
2114        let flow = FlowBuilder::new();
2115        let node = flow.process::<()>();
2116        let external = flow.external::<()>();
2117        let (tick_send, tick_trigger) =
2118            node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
2119
2120        let node_tick = node.tick();
2121        let (watermark_complete_cycle, watermark) =
2122            node_tick.cycle_with_initial(node_tick.singleton(q!(1)));
2123        let next_watermark = watermark.clone().map(q!(|v| v + 1));
2124        watermark_complete_cycle.complete_next_tick(next_watermark);
2125
2126        let tick_triggered_input = node
2127            .source_iter(q!([(3, 103)]))
2128            .batch(&node_tick, nondet!(/** test */))
2129            .filter_if_some(
2130                tick_trigger
2131                    .clone()
2132                    .batch(&node_tick, nondet!(/** test */))
2133                    .first(),
2134            )
2135            .all_ticks();
2136
2137        let sum = node
2138            .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2139            .interleave(tick_triggered_input)
2140            .into_keyed()
2141            .reduce_watermark_commutative(
2142                watermark,
2143                q!(|acc, v| {
2144                    *acc += v;
2145                }),
2146            )
2147            .snapshot(&node_tick, nondet!(/** test */))
2148            .entries()
2149            .all_ticks()
2150            .send_bincode_external(&external);
2151
2152        let nodes = flow
2153            .with_default_optimize()
2154            .with_process(&node, deployment.Localhost())
2155            .with_external(&external, deployment.Localhost())
2156            .deploy(&mut deployment);
2157
2158        deployment.deploy().await.unwrap();
2159
2160        let mut tick_send = nodes.connect(tick_send).await;
2161        let mut out_recv = nodes.connect(sum).await;
2162
2163        deployment.start().await.unwrap();
2164
2165        assert_eq!(out_recv.next().await.unwrap(), (2, 204));
2166
2167        tick_send.send(()).await.unwrap();
2168
2169        assert_eq!(out_recv.next().await.unwrap(), (3, 103));
2170    }
2171}