hydro_lang/live_collections/keyed_stream/
mod.rs

1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, Unbounded};
13use super::keyed_singleton::KeyedSingleton;
14use super::optional::Optional;
15use super::stream::{ExactlyOnce, MinOrder, MinRetries, NoOrder, Stream, TotalOrder};
16use crate::compile::ir::{
17    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
18};
19#[cfg(stageleft_runtime)]
20use crate::forward_handle::{CycleCollection, ReceiverComplete};
21use crate::forward_handle::{ForwardRef, TickCycle};
22use crate::live_collections::stream::{Ordering, Retries};
23#[cfg(stageleft_runtime)]
24use crate::location::dynamic::{DynLocation, LocationId};
25use crate::location::tick::DeferTick;
26use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
27use crate::manual_expr::ManualExpr;
28use crate::nondet::{NonDet, nondet};
29
30pub mod networking;
31
32/// Streaming elements of type `V` grouped by a key of type `K`.
33///
34/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
35/// order of keys is non-deterministic but the order *within* each group may be deterministic.
36///
37/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
38/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
39/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
40///
41/// Type Parameters:
42/// - `K`: the type of the key for each group
43/// - `V`: the type of the elements inside each group
44/// - `Loc`: the [`Location`] where the keyed stream is materialized
45/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
46/// - `Order`: tracks whether the elements within each group have deterministic order
47///   ([`TotalOrder`]) or not ([`NoOrder`])
48/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
49///   ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
50pub struct KeyedStream<
51    K,
52    V,
53    Loc,
54    Bound: Boundedness = Unbounded,
55    Order: Ordering = TotalOrder,
56    Retry: Retries = ExactlyOnce,
57> {
58    pub(crate) location: Loc,
59    pub(crate) ir_node: RefCell<HydroNode>,
60
61    _phantom: PhantomData<(K, V, Loc, Bound, Order, Retry)>,
62}
63
64impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
65    for KeyedStream<K, V, L, B, NoOrder, R>
66where
67    L: Location<'a>,
68{
69    fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
70        KeyedStream {
71            location: stream.location,
72            ir_node: stream.ir_node,
73            _phantom: PhantomData,
74        }
75    }
76}
77
78impl<'a, K, V, L, O: Ordering, R: Retries> DeferTick for KeyedStream<K, V, Tick<L>, Bounded, O, R>
79where
80    L: Location<'a>,
81{
82    fn defer_tick(self) -> Self {
83        KeyedStream::defer_tick(self)
84    }
85}
86
87impl<'a, K, V, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
88    for KeyedStream<K, V, Tick<L>, Bounded, O, R>
89where
90    L: Location<'a>,
91{
92    type Location = Tick<L>;
93
94    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
95        KeyedStream {
96            location: location.clone(),
97            ir_node: RefCell::new(HydroNode::CycleSource {
98                ident,
99                metadata: location.new_node_metadata(
100                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
101                ),
102            }),
103            _phantom: PhantomData,
104        }
105    }
106}
107
108impl<'a, K, V, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
109    for KeyedStream<K, V, Tick<L>, Bounded, O, R>
110where
111    L: Location<'a>,
112{
113    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
114        assert_eq!(
115            Location::id(&self.location),
116            expected_location,
117            "locations do not match"
118        );
119
120        self.location
121            .flow_state()
122            .borrow_mut()
123            .push_root(HydroRoot::CycleSink {
124                ident,
125                input: Box::new(self.ir_node.into_inner()),
126                op_metadata: HydroIrOpMetadata::new(),
127            });
128    }
129}
130
131impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
132    for KeyedStream<K, V, L, B, O, R>
133where
134    L: Location<'a> + NoTick,
135{
136    type Location = L;
137
138    fn create_source(ident: syn::Ident, location: L) -> Self {
139        KeyedStream {
140            location: location.clone(),
141            ir_node: RefCell::new(HydroNode::CycleSource {
142                ident,
143                metadata: location
144                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
145            }),
146            _phantom: PhantomData,
147        }
148    }
149}
150
151impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
152    for KeyedStream<K, V, L, B, O, R>
153where
154    L: Location<'a> + NoTick,
155{
156    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
157        assert_eq!(
158            Location::id(&self.location),
159            expected_location,
160            "locations do not match"
161        );
162        self.location
163            .flow_state()
164            .borrow_mut()
165            .push_root(HydroRoot::CycleSink {
166                ident,
167                input: Box::new(self.ir_node.into_inner()),
168                op_metadata: HydroIrOpMetadata::new(),
169            });
170    }
171}
172
173impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
174    Clone for KeyedStream<K, V, Loc, Bound, Order, R>
175{
176    fn clone(&self) -> Self {
177        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
178            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
179            *self.ir_node.borrow_mut() = HydroNode::Tee {
180                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
181                metadata: self.location.new_node_metadata(Self::collection_kind()),
182            };
183        }
184
185        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
186            KeyedStream {
187                location: self.location.clone(),
188                ir_node: HydroNode::Tee {
189                    inner: TeeNode(inner.0.clone()),
190                    metadata: metadata.clone(),
191                }
192                .into(),
193                _phantom: PhantomData,
194            }
195        } else {
196            unreachable!()
197        }
198    }
199}
200
201impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
202    KeyedStream<K, V, L, B, O, R>
203{
204    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
205        debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
206        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
207
208        KeyedStream {
209            location,
210            ir_node: RefCell::new(ir_node),
211            _phantom: PhantomData,
212        }
213    }
214
215    /// Returns the [`CollectionKind`] corresponding to this type.
216    pub fn collection_kind() -> CollectionKind {
217        CollectionKind::KeyedStream {
218            bound: B::BOUND_KIND,
219            value_order: O::ORDERING_KIND,
220            value_retry: R::RETRIES_KIND,
221            key_type: stageleft::quote_type::<K>().into(),
222            value_type: stageleft::quote_type::<V>().into(),
223        }
224    }
225
226    /// Returns the [`Location`] where this keyed stream is being materialized.
227    pub fn location(&self) -> &L {
228        &self.location
229    }
230
231    /// Explicitly "casts" the keyed stream to a type with a different ordering
232    /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
233    /// by the type-system.
234    ///
235    /// # Non-Determinism
236    /// This function is used as an escape hatch, and any mistakes in the
237    /// provided ordering guarantee will propagate into the guarantees
238    /// for the rest of the program.
239    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
240        if O::ORDERING_KIND == O2::ORDERING_KIND {
241            KeyedStream::new(self.location, self.ir_node.into_inner())
242        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
243            // We can always weaken the ordering guarantee
244            KeyedStream::new(
245                self.location.clone(),
246                HydroNode::Cast {
247                    inner: Box::new(self.ir_node.into_inner()),
248                    metadata: self
249                        .location
250                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
251                },
252            )
253        } else {
254            KeyedStream::new(
255                self.location.clone(),
256                HydroNode::ObserveNonDet {
257                    inner: Box::new(self.ir_node.into_inner()),
258                    trusted: false,
259                    metadata: self
260                        .location
261                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
262                },
263            )
264        }
265    }
266
267    fn assume_ordering_trusted<O2: Ordering>(
268        self,
269        _nondet: NonDet,
270    ) -> KeyedStream<K, V, L, B, O2, R> {
271        if O::ORDERING_KIND == O2::ORDERING_KIND {
272            KeyedStream::new(self.location, self.ir_node.into_inner())
273        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
274            // We can always weaken the ordering guarantee
275            KeyedStream::new(
276                self.location.clone(),
277                HydroNode::Cast {
278                    inner: Box::new(self.ir_node.into_inner()),
279                    metadata: self
280                        .location
281                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
282                },
283            )
284        } else {
285            KeyedStream::new(
286                self.location.clone(),
287                HydroNode::ObserveNonDet {
288                    inner: Box::new(self.ir_node.into_inner()),
289                    trusted: true,
290                    metadata: self
291                        .location
292                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
293                },
294            )
295        }
296    }
297
298    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
299    /// which is always safe because that is the weakest possible guarantee.
300    pub fn weakest_ordering(self) -> KeyedStream<K, V, L, B, NoOrder, R> {
301        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
302        self.assume_ordering::<NoOrder>(nondet)
303    }
304
305    /// Explicitly "casts" the keyed stream to a type with a different retries
306    /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
307    /// be proven by the type-system.
308    ///
309    /// # Non-Determinism
310    /// This function is used as an escape hatch, and any mistakes in the
311    /// provided retries guarantee will propagate into the guarantees
312    /// for the rest of the program.
313    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
314        if R::RETRIES_KIND == R2::RETRIES_KIND {
315            KeyedStream::new(self.location, self.ir_node.into_inner())
316        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
317            // We can always weaken the retries guarantee
318            KeyedStream::new(
319                self.location.clone(),
320                HydroNode::Cast {
321                    inner: Box::new(self.ir_node.into_inner()),
322                    metadata: self
323                        .location
324                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
325                },
326            )
327        } else {
328            KeyedStream::new(
329                self.location.clone(),
330                HydroNode::ObserveNonDet {
331                    inner: Box::new(self.ir_node.into_inner()),
332                    trusted: false,
333                    metadata: self
334                        .location
335                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
336                },
337            )
338        }
339    }
340
341    /// Flattens the keyed stream into an unordered stream of key-value pairs.
342    ///
343    /// # Example
344    /// ```rust
345    /// # #[cfg(feature = "deploy")] {
346    /// # use hydro_lang::prelude::*;
347    /// # use futures::StreamExt;
348    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
349    /// process
350    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
351    ///     .into_keyed()
352    ///     .entries()
353    /// # }, |mut stream| async move {
354    /// // (1, 2), (1, 3), (2, 4) in any order
355    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
356    /// #     assert_eq!(stream.next().await.unwrap(), w);
357    /// # }
358    /// # }));
359    /// # }
360    /// ```
361    pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
362        Stream::new(
363            self.location.clone(),
364            HydroNode::Cast {
365                inner: Box::new(self.ir_node.into_inner()),
366                metadata: self
367                    .location
368                    .new_node_metadata(Stream::<(K, V), L, B, NoOrder, R>::collection_kind()),
369            },
370        )
371    }
372
373    /// Flattens the keyed stream into an unordered stream of only the values.
374    ///
375    /// # Example
376    /// ```rust
377    /// # #[cfg(feature = "deploy")] {
378    /// # use hydro_lang::prelude::*;
379    /// # use futures::StreamExt;
380    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
381    /// process
382    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
383    ///     .into_keyed()
384    ///     .values()
385    /// # }, |mut stream| async move {
386    /// // 2, 3, 4 in any order
387    /// # for w in vec![2, 3, 4] {
388    /// #     assert_eq!(stream.next().await.unwrap(), w);
389    /// # }
390    /// # }));
391    /// # }
392    /// ```
393    pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
394        self.entries().map(q!(|(_, v)| v))
395    }
396
397    /// Transforms each value by invoking `f` on each element, with keys staying the same
398    /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
399    ///
400    /// If you do not want to modify the stream and instead only want to view
401    /// each item use [`KeyedStream::inspect`] instead.
402    ///
403    /// # Example
404    /// ```rust
405    /// # #[cfg(feature = "deploy")] {
406    /// # use hydro_lang::prelude::*;
407    /// # use futures::StreamExt;
408    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
409    /// process
410    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
411    ///     .into_keyed()
412    ///     .map(q!(|v| v + 1))
413    /// #   .entries()
414    /// # }, |mut stream| async move {
415    /// // { 1: [3, 4], 2: [5] }
416    /// # for w in vec![(1, 3), (1, 4), (2, 5)] {
417    /// #     assert_eq!(stream.next().await.unwrap(), w);
418    /// # }
419    /// # }));
420    /// # }
421    /// ```
422    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
423    where
424        F: Fn(V) -> U + 'a,
425    {
426        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
427        let map_f = q!({
428            let orig = f;
429            move |(k, v)| (k, orig(v))
430        })
431        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
432        .into();
433
434        KeyedStream::new(
435            self.location.clone(),
436            HydroNode::Map {
437                f: map_f,
438                input: Box::new(self.ir_node.into_inner()),
439                metadata: self
440                    .location
441                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
442            },
443        )
444    }
445
446    /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
447    /// re-grouped even they are tuples; instead they will be grouped under the original key.
448    ///
449    /// If you do not want to modify the stream and instead only want to view
450    /// each item use [`KeyedStream::inspect_with_key`] instead.
451    ///
452    /// # Example
453    /// ```rust
454    /// # #[cfg(feature = "deploy")] {
455    /// # use hydro_lang::prelude::*;
456    /// # use futures::StreamExt;
457    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
458    /// process
459    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
460    ///     .into_keyed()
461    ///     .map_with_key(q!(|(k, v)| k + v))
462    /// #   .entries()
463    /// # }, |mut stream| async move {
464    /// // { 1: [3, 4], 2: [6] }
465    /// # for w in vec![(1, 3), (1, 4), (2, 6)] {
466    /// #     assert_eq!(stream.next().await.unwrap(), w);
467    /// # }
468    /// # }));
469    /// # }
470    /// ```
471    pub fn map_with_key<U, F>(
472        self,
473        f: impl IntoQuotedMut<'a, F, L> + Copy,
474    ) -> KeyedStream<K, U, L, B, O, R>
475    where
476        F: Fn((K, V)) -> U + 'a,
477        K: Clone,
478    {
479        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
480        let map_f = q!({
481            let orig = f;
482            move |(k, v)| {
483                let out = orig((Clone::clone(&k), v));
484                (k, out)
485            }
486        })
487        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
488        .into();
489
490        KeyedStream::new(
491            self.location.clone(),
492            HydroNode::Map {
493                f: map_f,
494                input: Box::new(self.ir_node.into_inner()),
495                metadata: self
496                    .location
497                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
498            },
499        )
500    }
501
502    /// Prepends a new value to the key of each element in the stream, producing a new
503    /// keyed stream with compound keys. Because the original key is preserved, no re-grouping
504    /// occurs and the elements in each group preserve their original order.
505    ///
506    /// # Example
507    /// ```rust
508    /// # #[cfg(feature = "deploy")] {
509    /// # use hydro_lang::prelude::*;
510    /// # use futures::StreamExt;
511    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
512    /// process
513    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
514    ///     .into_keyed()
515    ///     .prefix_key(q!(|&(k, _)| k % 2))
516    /// #   .entries()
517    /// # }, |mut stream| async move {
518    /// // { (1, 1): [2, 3], (0, 2): [4] }
519    /// # for w in vec![((1, 1), 2), ((1, 1), 3), ((0, 2), 4)] {
520    /// #     assert_eq!(stream.next().await.unwrap(), w);
521    /// # }
522    /// # }));
523    /// # }
524    /// ```
525    pub fn prefix_key<K2, F>(
526        self,
527        f: impl IntoQuotedMut<'a, F, L> + Copy,
528    ) -> KeyedStream<(K2, K), V, L, B, O, R>
529    where
530        F: Fn(&(K, V)) -> K2 + 'a,
531    {
532        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
533        let map_f = q!({
534            let orig = f;
535            move |kv| {
536                let out = orig(&kv);
537                ((out, kv.0), kv.1)
538            }
539        })
540        .splice_fn1_ctx::<(K, V), ((K2, K), V)>(&self.location)
541        .into();
542
543        KeyedStream::new(
544            self.location.clone(),
545            HydroNode::Map {
546                f: map_f,
547                input: Box::new(self.ir_node.into_inner()),
548                metadata: self
549                    .location
550                    .new_node_metadata(KeyedStream::<(K2, K), V, L, B, O, R>::collection_kind()),
551            },
552        )
553    }
554
555    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
556    /// `f`, preserving the order of the elements within the group.
557    ///
558    /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
559    /// not modify or take ownership of the values. If you need to modify the values while filtering
560    /// use [`KeyedStream::filter_map`] instead.
561    ///
562    /// # Example
563    /// ```rust
564    /// # #[cfg(feature = "deploy")] {
565    /// # use hydro_lang::prelude::*;
566    /// # use futures::StreamExt;
567    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
568    /// process
569    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
570    ///     .into_keyed()
571    ///     .filter(q!(|&x| x > 2))
572    /// #   .entries()
573    /// # }, |mut stream| async move {
574    /// // { 1: [3], 2: [4] }
575    /// # for w in vec![(1, 3), (2, 4)] {
576    /// #     assert_eq!(stream.next().await.unwrap(), w);
577    /// # }
578    /// # }));
579    /// # }
580    /// ```
581    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
582    where
583        F: Fn(&V) -> bool + 'a,
584    {
585        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
586        let filter_f = q!({
587            let orig = f;
588            move |t: &(_, _)| orig(&t.1)
589        })
590        .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
591        .into();
592
593        KeyedStream::new(
594            self.location.clone(),
595            HydroNode::Filter {
596                f: filter_f,
597                input: Box::new(self.ir_node.into_inner()),
598                metadata: self.location.new_node_metadata(Self::collection_kind()),
599            },
600        )
601    }
602
603    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
604    /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
605    ///
606    /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
607    /// not modify or take ownership of the values. If you need to modify the values while filtering
608    /// use [`KeyedStream::filter_map_with_key`] instead.
609    ///
610    /// # Example
611    /// ```rust
612    /// # #[cfg(feature = "deploy")] {
613    /// # use hydro_lang::prelude::*;
614    /// # use futures::StreamExt;
615    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
616    /// process
617    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
618    ///     .into_keyed()
619    ///     .filter_with_key(q!(|&(k, v)| v - k == 2))
620    /// #   .entries()
621    /// # }, |mut stream| async move {
622    /// // { 1: [3], 2: [4] }
623    /// # for w in vec![(1, 3), (2, 4)] {
624    /// #     assert_eq!(stream.next().await.unwrap(), w);
625    /// # }
626    /// # }));
627    /// # }
628    /// ```
629    pub fn filter_with_key<F>(
630        self,
631        f: impl IntoQuotedMut<'a, F, L> + Copy,
632    ) -> KeyedStream<K, V, L, B, O, R>
633    where
634        F: Fn(&(K, V)) -> bool + 'a,
635    {
636        let filter_f = f
637            .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
638            .into();
639
640        KeyedStream::new(
641            self.location.clone(),
642            HydroNode::Filter {
643                f: filter_f,
644                input: Box::new(self.ir_node.into_inner()),
645                metadata: self.location.new_node_metadata(Self::collection_kind()),
646            },
647        )
648    }
649
650    /// An operator that both filters and maps each value, with keys staying the same.
651    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
652    /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
653    ///
654    /// # Example
655    /// ```rust
656    /// # #[cfg(feature = "deploy")] {
657    /// # use hydro_lang::prelude::*;
658    /// # use futures::StreamExt;
659    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
660    /// process
661    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
662    ///     .into_keyed()
663    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
664    /// #   .entries()
665    /// # }, |mut stream| async move {
666    /// // { 1: [2], 2: [4] }
667    /// # for w in vec![(1, 2), (2, 4)] {
668    /// #     assert_eq!(stream.next().await.unwrap(), w);
669    /// # }
670    /// # }));
671    /// # }
672    /// ```
673    pub fn filter_map<U, F>(
674        self,
675        f: impl IntoQuotedMut<'a, F, L> + Copy,
676    ) -> KeyedStream<K, U, L, B, O, R>
677    where
678        F: Fn(V) -> Option<U> + 'a,
679    {
680        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
681        let filter_map_f = q!({
682            let orig = f;
683            move |(k, v)| orig(v).map(|o| (k, o))
684        })
685        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
686        .into();
687
688        KeyedStream::new(
689            self.location.clone(),
690            HydroNode::FilterMap {
691                f: filter_map_f,
692                input: Box::new(self.ir_node.into_inner()),
693                metadata: self
694                    .location
695                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
696            },
697        )
698    }
699
700    /// An operator that both filters and maps each key-value pair. The resulting values are **not**
701    /// re-grouped even they are tuples; instead they will be grouped under the original key.
702    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
703    ///
704    /// # Example
705    /// ```rust
706    /// # #[cfg(feature = "deploy")] {
707    /// # use hydro_lang::prelude::*;
708    /// # use futures::StreamExt;
709    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
710    /// process
711    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
712    ///     .into_keyed()
713    ///     .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
714    /// #   .entries()
715    /// # }, |mut stream| async move {
716    /// // { 2: [2] }
717    /// # for w in vec![(2, 2)] {
718    /// #     assert_eq!(stream.next().await.unwrap(), w);
719    /// # }
720    /// # }));
721    /// # }
722    /// ```
723    pub fn filter_map_with_key<U, F>(
724        self,
725        f: impl IntoQuotedMut<'a, F, L> + Copy,
726    ) -> KeyedStream<K, U, L, B, O, R>
727    where
728        F: Fn((K, V)) -> Option<U> + 'a,
729        K: Clone,
730    {
731        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
732        let filter_map_f = q!({
733            let orig = f;
734            move |(k, v)| {
735                let out = orig((Clone::clone(&k), v));
736                out.map(|o| (k, o))
737            }
738        })
739        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
740        .into();
741
742        KeyedStream::new(
743            self.location.clone(),
744            HydroNode::FilterMap {
745                f: filter_map_f,
746                input: Box::new(self.ir_node.into_inner()),
747                metadata: self
748                    .location
749                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
750            },
751        )
752    }
753
754    /// Generates a keyed stream that maps each value `v` to a tuple `(v, x)`,
755    /// where `v` is the value of `other`, a bounded [`super::singleton::Singleton`] or
756    /// [`Optional`]. If `other` is an empty [`Optional`], no values will be produced.
757    ///
758    /// # Example
759    /// ```rust
760    /// # #[cfg(feature = "deploy")] {
761    /// # use hydro_lang::prelude::*;
762    /// # use futures::StreamExt;
763    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
764    /// let tick = process.tick();
765    /// let batch = process
766    ///   .source_iter(q!(vec![(1, 123), (1, 456), (2, 123)]))
767    ///   .into_keyed()
768    ///   .batch(&tick, nondet!(/** test */));
769    /// let count = batch.clone().entries().count(); // `count()` returns a singleton
770    /// batch.cross_singleton(count).all_ticks().entries()
771    /// # }, |mut stream| async move {
772    /// // { 1: [(123, 3), (456, 3)], 2: [(123, 3)] }
773    /// # for w in vec![(1, (123, 3)), (1, (456, 3)), (2, (123, 3))] {
774    /// #     assert_eq!(stream.next().await.unwrap(), w);
775    /// # }
776    /// # }));
777    /// # }
778    /// ```
779    pub fn cross_singleton<O2>(
780        self,
781        other: impl Into<Optional<O2, L, Bounded>>,
782    ) -> KeyedStream<K, (V, O2), L, B, O, R>
783    where
784        O2: Clone,
785    {
786        let other: Optional<O2, L, Bounded> = other.into();
787        check_matching_location(&self.location, &other.location);
788
789        Stream::new(
790            self.location.clone(),
791            HydroNode::CrossSingleton {
792                left: Box::new(self.ir_node.into_inner()),
793                right: Box::new(other.ir_node.into_inner()),
794                metadata: self
795                    .location
796                    .new_node_metadata(Stream::<((K, V), O2), L, B, O, R>::collection_kind()),
797            },
798        )
799        .map(q!(|((k, v), o2)| (k, (v, o2))))
800        .into_keyed()
801    }
802
803    /// For each value `v` in each group, transform `v` using `f` and then treat the
804    /// result as an [`Iterator`] to produce values one by one within the same group.
805    /// The implementation for [`Iterator`] for the output type `I` must produce items
806    /// in a **deterministic** order.
807    ///
808    /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
809    /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
810    ///
811    /// # Example
812    /// ```rust
813    /// # #[cfg(feature = "deploy")] {
814    /// # use hydro_lang::prelude::*;
815    /// # use futures::StreamExt;
816    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
817    /// process
818    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
819    ///     .into_keyed()
820    ///     .flat_map_ordered(q!(|x| x))
821    /// #   .entries()
822    /// # }, |mut stream| async move {
823    /// // { 1: [2, 3, 4], 2: [5, 6] }
824    /// # for w in vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)] {
825    /// #     assert_eq!(stream.next().await.unwrap(), w);
826    /// # }
827    /// # }));
828    /// # }
829    /// ```
830    pub fn flat_map_ordered<U, I, F>(
831        self,
832        f: impl IntoQuotedMut<'a, F, L> + Copy,
833    ) -> KeyedStream<K, U, L, B, O, R>
834    where
835        I: IntoIterator<Item = U>,
836        F: Fn(V) -> I + 'a,
837        K: Clone,
838    {
839        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
840        let flat_map_f = q!({
841            let orig = f;
842            move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
843        })
844        .splice_fn1_ctx::<(K, V), _>(&self.location)
845        .into();
846
847        KeyedStream::new(
848            self.location.clone(),
849            HydroNode::FlatMap {
850                f: flat_map_f,
851                input: Box::new(self.ir_node.into_inner()),
852                metadata: self
853                    .location
854                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
855            },
856        )
857    }
858
859    /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
860    /// for the output type `I` to produce items in any order.
861    ///
862    /// # Example
863    /// ```rust
864    /// # #[cfg(feature = "deploy")] {
865    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
866    /// # use futures::StreamExt;
867    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
868    /// process
869    ///     .source_iter(q!(vec![
870    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
871    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
872    ///     ]))
873    ///     .into_keyed()
874    ///     .flat_map_unordered(q!(|x| x))
875    /// #   .entries()
876    /// # }, |mut stream| async move {
877    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
878    /// # let mut results = Vec::new();
879    /// # for _ in 0..4 {
880    /// #     results.push(stream.next().await.unwrap());
881    /// # }
882    /// # results.sort();
883    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
884    /// # }));
885    /// # }
886    /// ```
887    pub fn flat_map_unordered<U, I, F>(
888        self,
889        f: impl IntoQuotedMut<'a, F, L> + Copy,
890    ) -> KeyedStream<K, U, L, B, NoOrder, R>
891    where
892        I: IntoIterator<Item = U>,
893        F: Fn(V) -> I + 'a,
894        K: Clone,
895    {
896        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
897        let flat_map_f = q!({
898            let orig = f;
899            move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
900        })
901        .splice_fn1_ctx::<(K, V), _>(&self.location)
902        .into();
903
904        KeyedStream::new(
905            self.location.clone(),
906            HydroNode::FlatMap {
907                f: flat_map_f,
908                input: Box::new(self.ir_node.into_inner()),
909                metadata: self
910                    .location
911                    .new_node_metadata(KeyedStream::<K, U, L, B, NoOrder, R>::collection_kind()),
912            },
913        )
914    }
915
916    /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
917    /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
918    /// items in a **deterministic** order.
919    ///
920    /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
921    /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
922    ///
923    /// # Example
924    /// ```rust
925    /// # #[cfg(feature = "deploy")] {
926    /// # use hydro_lang::prelude::*;
927    /// # use futures::StreamExt;
928    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
929    /// process
930    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
931    ///     .into_keyed()
932    ///     .flatten_ordered()
933    /// #   .entries()
934    /// # }, |mut stream| async move {
935    /// // { 1: [2, 3, 4], 2: [5, 6] }
936    /// # for w in vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)] {
937    /// #     assert_eq!(stream.next().await.unwrap(), w);
938    /// # }
939    /// # }));
940    /// # }
941    /// ```
942    pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
943    where
944        V: IntoIterator<Item = U>,
945        K: Clone,
946    {
947        self.flat_map_ordered(q!(|d| d))
948    }
949
950    /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
951    /// for the value type `V` to produce items in any order.
952    ///
953    /// # Example
954    /// ```rust
955    /// # #[cfg(feature = "deploy")] {
956    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
957    /// # use futures::StreamExt;
958    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
959    /// process
960    ///     .source_iter(q!(vec![
961    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
962    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
963    ///     ]))
964    ///     .into_keyed()
965    ///     .flatten_unordered()
966    /// #   .entries()
967    /// # }, |mut stream| async move {
968    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
969    /// # let mut results = Vec::new();
970    /// # for _ in 0..4 {
971    /// #     results.push(stream.next().await.unwrap());
972    /// # }
973    /// # results.sort();
974    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
975    /// # }));
976    /// # }
977    /// ```
978    pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
979    where
980        V: IntoIterator<Item = U>,
981        K: Clone,
982    {
983        self.flat_map_unordered(q!(|d| d))
984    }
985
986    /// An operator which allows you to "inspect" each element of a stream without
987    /// modifying it. The closure `f` is called on a reference to each value. This is
988    /// mainly useful for debugging, and should not be used to generate side-effects.
989    ///
990    /// # Example
991    /// ```rust
992    /// # #[cfg(feature = "deploy")] {
993    /// # use hydro_lang::prelude::*;
994    /// # use futures::StreamExt;
995    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
996    /// process
997    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
998    ///     .into_keyed()
999    ///     .inspect(q!(|v| println!("{}", v)))
1000    /// #   .entries()
1001    /// # }, |mut stream| async move {
1002    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
1003    /// #     assert_eq!(stream.next().await.unwrap(), w);
1004    /// # }
1005    /// # }));
1006    /// # }
1007    /// ```
1008    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1009    where
1010        F: Fn(&V) + 'a,
1011    {
1012        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1013        let inspect_f = q!({
1014            let orig = f;
1015            move |t: &(_, _)| orig(&t.1)
1016        })
1017        .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1018        .into();
1019
1020        KeyedStream::new(
1021            self.location.clone(),
1022            HydroNode::Inspect {
1023                f: inspect_f,
1024                input: Box::new(self.ir_node.into_inner()),
1025                metadata: self.location.new_node_metadata(Self::collection_kind()),
1026            },
1027        )
1028    }
1029
1030    /// An operator which allows you to "inspect" each element of a stream without
1031    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1032    /// mainly useful for debugging, and should not be used to generate side-effects.
1033    ///
1034    /// # Example
1035    /// ```rust
1036    /// # #[cfg(feature = "deploy")] {
1037    /// # use hydro_lang::prelude::*;
1038    /// # use futures::StreamExt;
1039    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1040    /// process
1041    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1042    ///     .into_keyed()
1043    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1044    /// #   .entries()
1045    /// # }, |mut stream| async move {
1046    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
1047    /// #     assert_eq!(stream.next().await.unwrap(), w);
1048    /// # }
1049    /// # }));
1050    /// # }
1051    /// ```
1052    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1053    where
1054        F: Fn(&(K, V)) + 'a,
1055    {
1056        let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1057
1058        KeyedStream::new(
1059            self.location.clone(),
1060            HydroNode::Inspect {
1061                f: inspect_f,
1062                input: Box::new(self.ir_node.into_inner()),
1063                metadata: self.location.new_node_metadata(Self::collection_kind()),
1064            },
1065        )
1066    }
1067
1068    /// An operator which allows you to "name" a `HydroNode`.
1069    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1070    pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
1071        {
1072            let mut node = self.ir_node.borrow_mut();
1073            let metadata = node.metadata_mut();
1074            metadata.tag = Some(name.to_string());
1075        }
1076        self
1077    }
1078}
1079
1080impl<'a, K1, K2, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
1081    KeyedStream<(K1, K2), V, L, B, O, R>
1082{
1083    /// Produces a new keyed stream by dropping the first element of the compound key.
1084    ///
1085    /// Because multiple keys may share the same suffix, this operation results in re-grouping
1086    /// of the values under the new keys. The values across groups with the same new key
1087    /// will be interleaved, so the resulting stream has [`NoOrder`] within each group.
1088    ///
1089    /// # Example
1090    /// ```rust
1091    /// # #[cfg(feature = "deploy")] {
1092    /// # use hydro_lang::prelude::*;
1093    /// # use futures::StreamExt;
1094    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1095    /// process
1096    ///     .source_iter(q!(vec![((1, 10), 2), ((1, 10), 3), ((2, 20), 4)]))
1097    ///     .into_keyed()
1098    ///     .drop_key_prefix()
1099    /// #   .entries()
1100    /// # }, |mut stream| async move {
1101    /// // { 10: [2, 3], 20: [4] }
1102    /// # for w in vec![(10, 2), (10, 3), (20, 4)] {
1103    /// #     assert_eq!(stream.next().await.unwrap(), w);
1104    /// # }
1105    /// # }));
1106    /// # }
1107    /// ```
1108    pub fn drop_key_prefix(self) -> KeyedStream<K2, V, L, B, NoOrder, R> {
1109        self.entries()
1110            .map(q!(|((_k1, k2), v)| (k2, v)))
1111            .into_keyed()
1112    }
1113}
1114
1115impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries>
1116    KeyedStream<K, V, L, Unbounded, O, R>
1117{
1118    /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
1119    /// of any overlapping groups. The result has [`NoOrder`] on each group because the
1120    /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
1121    /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
1122    ///
1123    /// Currently, both input streams must be [`Unbounded`].
1124    ///
1125    /// # Example
1126    /// ```rust
1127    /// # #[cfg(feature = "deploy")] {
1128    /// # use hydro_lang::prelude::*;
1129    /// # use futures::StreamExt;
1130    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1131    /// let numbers1 = process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed();
1132    /// let numbers2 = process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed();
1133    /// numbers1.interleave(numbers2)
1134    /// #   .entries()
1135    /// # }, |mut stream| async move {
1136    /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
1137    /// # for w in vec![(1, 2), (3, 4), (1, 3), (3, 5)] {
1138    /// #     assert_eq!(stream.next().await.unwrap(), w);
1139    /// # }
1140    /// # }));
1141    /// # }
1142    /// ```
1143    pub fn interleave<O2: Ordering, R2: Retries>(
1144        self,
1145        other: KeyedStream<K, V, L, Unbounded, O2, R2>,
1146    ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1147    where
1148        R: MinRetries<R2>,
1149    {
1150        let tick = self.location.tick();
1151        // Because the outputs are unordered, we can interleave batches from both streams.
1152        let nondet_batch_interleaving = nondet!(/** output stream is NoOrder, can interleave */);
1153        self.batch(&tick, nondet_batch_interleaving)
1154            .weakest_ordering()
1155            .chain(
1156                other
1157                    .batch(&tick, nondet_batch_interleaving)
1158                    .weakest_ordering(),
1159            )
1160            .all_ticks()
1161    }
1162}
1163
1164/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
1165/// control the processing of future elements.
1166pub enum Generate<T> {
1167    /// Emit the provided element, and keep processing future inputs.
1168    Yield(T),
1169    /// Emit the provided element as the _final_ element, do not process future inputs.
1170    Return(T),
1171    /// Do not emit anything, but continue processing future inputs.
1172    Continue,
1173    /// Do not emit anything, and do not process further inputs.
1174    Break,
1175}
1176
1177impl<'a, K, V, L, B: Boundedness> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
1178where
1179    L: Location<'a>,
1180{
1181    /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
1182    ///
1183    /// Unlike [`Stream::fold_keyed`] which only returns the final accumulated value, `scan` produces a new stream
1184    /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
1185    /// early by returning `None`.
1186    ///
1187    /// The function takes a mutable reference to the accumulator and the current element, and returns
1188    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1189    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1190    ///
1191    /// # Example
1192    /// ```rust
1193    /// # #[cfg(feature = "deploy")] {
1194    /// # use hydro_lang::prelude::*;
1195    /// # use futures::StreamExt;
1196    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1197    /// process
1198    ///     .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
1199    ///     .into_keyed()
1200    ///     .scan(
1201    ///         q!(|| 0),
1202    ///         q!(|acc, x| {
1203    ///             *acc += x;
1204    ///             if *acc % 2 == 0 { None } else { Some(*acc) }
1205    ///         }),
1206    ///     )
1207    /// #   .entries()
1208    /// # }, |mut stream| async move {
1209    /// // Output: { 0: [1], 1: [3, 7] }
1210    /// # for w in vec![(0, 1), (1, 3), (1, 7)] {
1211    /// #     assert_eq!(stream.next().await.unwrap(), w);
1212    /// # }
1213    /// # }));
1214    /// # }
1215    /// ```
1216    pub fn scan<A, U, I, F>(
1217        self,
1218        init: impl IntoQuotedMut<'a, I, L> + Copy,
1219        f: impl IntoQuotedMut<'a, F, L> + Copy,
1220    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1221    where
1222        K: Clone + Eq + Hash,
1223        I: Fn() -> A + 'a,
1224        F: Fn(&mut A, V) -> Option<U> + 'a,
1225    {
1226        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1227        self.generator(
1228            init,
1229            q!({
1230                let orig = f;
1231                move |state, v| {
1232                    if let Some(out) = orig(state, v) {
1233                        Generate::Yield(out)
1234                    } else {
1235                        Generate::Break
1236                    }
1237                }
1238            }),
1239        )
1240    }
1241
1242    /// Iteratively processes the elements in each group using a state machine that can yield
1243    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1244    /// syntax in Rust, without requiring special syntax.
1245    ///
1246    /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
1247    /// state for each group. The second argument defines the processing logic, taking in a
1248    /// mutable reference to the group's state and the value to be processed. It emits a
1249    /// [`Generate`] value, whose variants define what is emitted and whether further inputs
1250    /// should be processed.
1251    ///
1252    /// # Example
1253    /// ```rust
1254    /// # #[cfg(feature = "deploy")] {
1255    /// # use hydro_lang::prelude::*;
1256    /// # use futures::StreamExt;
1257    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1258    /// process
1259    ///     .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
1260    ///     .into_keyed()
1261    ///     .generator(
1262    ///         q!(|| 0),
1263    ///         q!(|acc, x| {
1264    ///             *acc += x;
1265    ///             if *acc > 100 {
1266    ///                 hydro_lang::live_collections::keyed_stream::Generate::Return(
1267    ///                     "done!".to_string()
1268    ///                 )
1269    ///             } else if *acc % 2 == 0 {
1270    ///                 hydro_lang::live_collections::keyed_stream::Generate::Yield(
1271    ///                     "even".to_string()
1272    ///                 )
1273    ///             } else {
1274    ///                 hydro_lang::live_collections::keyed_stream::Generate::Continue
1275    ///             }
1276    ///         }),
1277    ///     )
1278    /// #   .entries()
1279    /// # }, |mut stream| async move {
1280    /// // Output: { 0: ["even", "done!"], 1: ["even"] }
1281    /// # for w in vec![(0, "even".to_string()), (0, "done!".to_string()), (1, "even".to_string())] {
1282    /// #     assert_eq!(stream.next().await.unwrap(), w);
1283    /// # }
1284    /// # }));
1285    /// # }
1286    /// ```
1287    pub fn generator<A, U, I, F>(
1288        self,
1289        init: impl IntoQuotedMut<'a, I, L> + Copy,
1290        f: impl IntoQuotedMut<'a, F, L> + Copy,
1291    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1292    where
1293        K: Clone + Eq + Hash,
1294        I: Fn() -> A + 'a,
1295        F: Fn(&mut A, V) -> Generate<U> + 'a,
1296    {
1297        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1298        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1299
1300        let scan_init = q!(|| HashMap::new())
1301            .splice_fn0_ctx::<HashMap<K, Option<A>>>(&self.location)
1302            .into();
1303        let scan_f = q!(move |acc: &mut HashMap<_, _>, (k, v)| {
1304            let existing_state = acc.entry(Clone::clone(&k)).or_insert_with(|| Some(init()));
1305            if let Some(existing_state_value) = existing_state {
1306                match f(existing_state_value, v) {
1307                    Generate::Yield(out) => Some(Some((k, out))),
1308                    Generate::Return(out) => {
1309                        let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1310                        Some(Some((k, out)))
1311                    }
1312                    Generate::Break => {
1313                        let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1314                        Some(None)
1315                    }
1316                    Generate::Continue => Some(None),
1317                }
1318            } else {
1319                Some(None)
1320            }
1321        })
1322        .splice_fn2_borrow_mut_ctx::<HashMap<K, Option<A>>, (K, V), _>(&self.location)
1323        .into();
1324
1325        let scan_node = HydroNode::Scan {
1326            init: scan_init,
1327            acc: scan_f,
1328            input: Box::new(self.ir_node.into_inner()),
1329            metadata: self.location.new_node_metadata(Stream::<
1330                Option<(K, U)>,
1331                L,
1332                B,
1333                TotalOrder,
1334                ExactlyOnce,
1335            >::collection_kind()),
1336        };
1337
1338        let flatten_f = q!(|d| d)
1339            .splice_fn1_ctx::<Option<(K, U)>, _>(&self.location)
1340            .into();
1341        let flatten_node = HydroNode::FlatMap {
1342            f: flatten_f,
1343            input: Box::new(scan_node),
1344            metadata: self.location.new_node_metadata(KeyedStream::<
1345                K,
1346                U,
1347                L,
1348                B,
1349                TotalOrder,
1350                ExactlyOnce,
1351            >::collection_kind()),
1352        };
1353
1354        KeyedStream::new(self.location.clone(), flatten_node)
1355    }
1356
1357    /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
1358    /// in-order across the values in each group. But the aggregation function returns a boolean,
1359    /// which when true indicates that the aggregated result is complete and can be released to
1360    /// downstream computation. Unlike [`Stream::fold_keyed`], this means that even if the input
1361    /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
1362    /// normal stream elements.
1363    ///
1364    /// # Example
1365    /// ```rust
1366    /// # #[cfg(feature = "deploy")] {
1367    /// # use hydro_lang::prelude::*;
1368    /// # use futures::StreamExt;
1369    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1370    /// process
1371    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1372    ///     .into_keyed()
1373    ///     .fold_early_stop(
1374    ///         q!(|| 0),
1375    ///         q!(|acc, x| {
1376    ///             *acc += x;
1377    ///             x % 2 == 0
1378    ///         }),
1379    ///     )
1380    /// #   .entries()
1381    /// # }, |mut stream| async move {
1382    /// // Output: { 0: 2, 1: 9 }
1383    /// # for w in vec![(0, 2), (1, 9)] {
1384    /// #     assert_eq!(stream.next().await.unwrap(), w);
1385    /// # }
1386    /// # }));
1387    /// # }
1388    /// ```
1389    pub fn fold_early_stop<A, I, F>(
1390        self,
1391        init: impl IntoQuotedMut<'a, I, L> + Copy,
1392        f: impl IntoQuotedMut<'a, F, L> + Copy,
1393    ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
1394    where
1395        K: Clone + Eq + Hash,
1396        I: Fn() -> A + 'a,
1397        F: Fn(&mut A, V) -> bool + 'a,
1398    {
1399        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1400        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1401        let out_without_bound_cast = self.generator(
1402            q!(move || Some(init())),
1403            q!(move |key_state, v| {
1404                if let Some(key_state_value) = key_state.as_mut() {
1405                    if f(key_state_value, v) {
1406                        Generate::Return(key_state.take().unwrap())
1407                    } else {
1408                        Generate::Continue
1409                    }
1410                } else {
1411                    unreachable!()
1412                }
1413            }),
1414        );
1415
1416        KeyedSingleton::new(
1417            out_without_bound_cast.location.clone(),
1418            HydroNode::Cast {
1419                inner: Box::new(out_without_bound_cast.ir_node.into_inner()),
1420                metadata: out_without_bound_cast
1421                    .location
1422                    .new_node_metadata(
1423                        KeyedSingleton::<K, A, L, B::WhenValueBounded>::collection_kind(),
1424                    ),
1425            },
1426        )
1427    }
1428
1429    /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
1430    /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
1431    /// otherwise the first element would be non-deterministic.
1432    ///
1433    /// # Example
1434    /// ```rust
1435    /// # #[cfg(feature = "deploy")] {
1436    /// # use hydro_lang::prelude::*;
1437    /// # use futures::StreamExt;
1438    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1439    /// process
1440    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1441    ///     .into_keyed()
1442    ///     .first()
1443    /// #   .entries()
1444    /// # }, |mut stream| async move {
1445    /// // Output: { 0: 2, 1: 3 }
1446    /// # for w in vec![(0, 2), (1, 3)] {
1447    /// #     assert_eq!(stream.next().await.unwrap(), w);
1448    /// # }
1449    /// # }));
1450    /// # }
1451    /// ```
1452    pub fn first(self) -> KeyedSingleton<K, V, L, B::WhenValueBounded>
1453    where
1454        K: Clone + Eq + Hash,
1455    {
1456        self.fold_early_stop(
1457            q!(|| None),
1458            q!(|acc, v| {
1459                *acc = Some(v);
1460                true
1461            }),
1462        )
1463        .map(q!(|v| v.unwrap()))
1464    }
1465
1466    /// Like [`Stream::fold`], aggregates the values in each group via the `comb` closure.
1467    ///
1468    /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1469    /// to depend on the order of elements in the group.
1470    ///
1471    /// If the input and output value types are the same and do not require initialization then use
1472    /// [`KeyedStream::reduce`].
1473    ///
1474    /// # Example
1475    /// ```rust
1476    /// # #[cfg(feature = "deploy")] {
1477    /// # use hydro_lang::prelude::*;
1478    /// # use futures::StreamExt;
1479    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1480    /// let tick = process.tick();
1481    /// let numbers = process
1482    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1483    ///     .into_keyed();
1484    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1485    /// batch
1486    ///     .fold(q!(|| 0), q!(|acc, x| *acc += x))
1487    ///     .entries()
1488    ///     .all_ticks()
1489    /// # }, |mut stream| async move {
1490    /// // (1, 5), (2, 7)
1491    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1492    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1493    /// # }));
1494    /// # }
1495    /// ```
1496    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1497        self,
1498        init: impl IntoQuotedMut<'a, I, L>,
1499        comb: impl IntoQuotedMut<'a, F, L>,
1500    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
1501    where
1502        K: Eq + Hash,
1503    {
1504        let init = init.splice_fn0_ctx(&self.location).into();
1505        let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1506
1507        KeyedSingleton::new(
1508            self.location.clone(),
1509            HydroNode::FoldKeyed {
1510                init,
1511                acc: comb,
1512                input: Box::new(self.ir_node.into_inner()),
1513                metadata: self.location.new_node_metadata(KeyedSingleton::<
1514                    K,
1515                    A,
1516                    L,
1517                    B::WhenValueUnbounded,
1518                >::collection_kind()),
1519            },
1520        )
1521    }
1522
1523    /// Like [`Stream::reduce`], aggregates the values in each group via the `comb` closure.
1524    ///
1525    /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1526    /// to depend on the order of elements in the stream.
1527    ///
1528    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1529    ///
1530    /// # Example
1531    /// ```rust
1532    /// # #[cfg(feature = "deploy")] {
1533    /// # use hydro_lang::prelude::*;
1534    /// # use futures::StreamExt;
1535    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1536    /// let tick = process.tick();
1537    /// let numbers = process
1538    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1539    ///     .into_keyed();
1540    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1541    /// batch.reduce(q!(|acc, x| *acc += x)).entries().all_ticks()
1542    /// # }, |mut stream| async move {
1543    /// // (1, 5), (2, 7)
1544    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1545    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1546    /// # }));
1547    /// # }
1548    /// ```
1549    pub fn reduce<F: Fn(&mut V, V) + 'a>(
1550        self,
1551        comb: impl IntoQuotedMut<'a, F, L>,
1552    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1553    where
1554        K: Eq + Hash,
1555    {
1556        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1557
1558        KeyedSingleton::new(
1559            self.location.clone(),
1560            HydroNode::ReduceKeyed {
1561                f,
1562                input: Box::new(self.ir_node.into_inner()),
1563                metadata: self.location.new_node_metadata(KeyedSingleton::<
1564                    K,
1565                    V,
1566                    L,
1567                    B::WhenValueUnbounded,
1568                >::collection_kind()),
1569            },
1570        )
1571    }
1572
1573    /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark are automatically deleted.
1574    ///
1575    /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1576    /// to depend on the order of elements in the stream.
1577    ///
1578    /// # Example
1579    /// ```rust
1580    /// # #[cfg(feature = "deploy")] {
1581    /// # use hydro_lang::prelude::*;
1582    /// # use futures::StreamExt;
1583    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1584    /// let tick = process.tick();
1585    /// let watermark = tick.singleton(q!(1));
1586    /// let numbers = process
1587    ///     .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1588    ///     .into_keyed();
1589    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1590    /// batch
1591    ///     .reduce_watermark(watermark, q!(|acc, x| *acc += x))
1592    ///     .entries()
1593    ///     .all_ticks()
1594    /// # }, |mut stream| async move {
1595    /// // (2, 204)
1596    /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
1597    /// # }));
1598    /// # }
1599    /// ```
1600    pub fn reduce_watermark<O, F>(
1601        self,
1602        other: impl Into<Optional<O, Tick<L::Root>, Bounded>>,
1603        comb: impl IntoQuotedMut<'a, F, L>,
1604    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1605    where
1606        K: Eq + Hash,
1607        O: Clone,
1608        F: Fn(&mut V, V) + 'a,
1609    {
1610        let other: Optional<O, Tick<L::Root>, Bounded> = other.into();
1611        check_matching_location(&self.location.root(), other.location.outer());
1612        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1613
1614        KeyedSingleton::new(
1615            self.location.clone(),
1616            HydroNode::ReduceKeyedWatermark {
1617                f,
1618                input: Box::new(self.ir_node.into_inner()),
1619                watermark: Box::new(other.ir_node.into_inner()),
1620                metadata: self.location.new_node_metadata(KeyedSingleton::<
1621                    K,
1622                    V,
1623                    L,
1624                    B::WhenValueUnbounded,
1625                >::collection_kind()),
1626            },
1627        )
1628    }
1629}
1630
1631impl<'a, K, V, L, B: Boundedness, O: Ordering> KeyedStream<K, V, L, B, O, ExactlyOnce>
1632where
1633    L: Location<'a>,
1634{
1635    /// Like [`Stream::fold_commutative`], aggregates the values in each group via the `comb` closure.
1636    ///
1637    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1638    ///
1639    /// If the input and output value types are the same and do not require initialization then use
1640    /// [`KeyedStream::reduce_commutative`].
1641    ///
1642    /// # Example
1643    /// ```rust
1644    /// # #[cfg(feature = "deploy")] {
1645    /// # use hydro_lang::prelude::*;
1646    /// # use futures::StreamExt;
1647    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1648    /// let tick = process.tick();
1649    /// let numbers = process
1650    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1651    ///     .into_keyed();
1652    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1653    /// batch
1654    ///     .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1655    ///     .entries()
1656    ///     .all_ticks()
1657    /// # }, |mut stream| async move {
1658    /// // (1, 5), (2, 7)
1659    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1660    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1661    /// # }));
1662    /// # }
1663    /// ```
1664    pub fn fold_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1665        self,
1666        init: impl IntoQuotedMut<'a, I, L>,
1667        comb: impl IntoQuotedMut<'a, F, L>,
1668    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
1669    where
1670        K: Eq + Hash,
1671    {
1672        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1673            .fold(init, comb)
1674    }
1675
1676    /// Like [`Stream::reduce_commutative`], aggregates the values in each group via the `comb` closure.
1677    ///
1678    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1679    ///
1680    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative`].
1681    ///
1682    /// # Example
1683    /// ```rust
1684    /// # #[cfg(feature = "deploy")] {
1685    /// # use hydro_lang::prelude::*;
1686    /// # use futures::StreamExt;
1687    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1688    /// let tick = process.tick();
1689    /// let numbers = process
1690    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1691    ///     .into_keyed();
1692    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1693    /// batch
1694    ///     .reduce_commutative(q!(|acc, x| *acc += x))
1695    ///     .entries()
1696    ///     .all_ticks()
1697    /// # }, |mut stream| async move {
1698    /// // (1, 5), (2, 7)
1699    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1700    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1701    /// # }));
1702    /// # }
1703    /// ```
1704    pub fn reduce_commutative<F: Fn(&mut V, V) + 'a>(
1705        self,
1706        comb: impl IntoQuotedMut<'a, F, L>,
1707    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1708    where
1709        K: Eq + Hash,
1710    {
1711        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1712            .reduce(comb)
1713    }
1714
1715    /// A special case of [`KeyedStream::reduce_commutative`] where tuples with keys less than the watermark are automatically deleted.
1716    ///
1717    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1718    ///
1719    /// # Example
1720    /// ```rust
1721    /// # #[cfg(feature = "deploy")] {
1722    /// # use hydro_lang::prelude::*;
1723    /// # use futures::StreamExt;
1724    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1725    /// let tick = process.tick();
1726    /// let watermark = tick.singleton(q!(1));
1727    /// let numbers = process
1728    ///     .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1729    ///     .into_keyed();
1730    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1731    /// batch
1732    ///     .reduce_watermark_commutative(watermark, q!(|acc, x| *acc += x))
1733    ///     .entries()
1734    ///     .all_ticks()
1735    /// # }, |mut stream| async move {
1736    /// // (2, 204)
1737    /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
1738    /// # }));
1739    /// # }
1740    /// ```
1741    pub fn reduce_watermark_commutative<O2, F>(
1742        self,
1743        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1744        comb: impl IntoQuotedMut<'a, F, L>,
1745    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1746    where
1747        K: Eq + Hash,
1748        O2: Clone,
1749        F: Fn(&mut V, V) + 'a,
1750    {
1751        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1752            .reduce_watermark(other, comb)
1753    }
1754
1755    /// Counts the number of elements in each group, producing a [`KeyedSingleton`] with the counts.
1756    ///
1757    /// # Example
1758    /// ```rust
1759    /// # #[cfg(feature = "deploy")] {
1760    /// # use hydro_lang::prelude::*;
1761    /// # use futures::StreamExt;
1762    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1763    /// let tick = process.tick();
1764    /// let numbers = process
1765    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4), (1, 5)]))
1766    ///     .into_keyed();
1767    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1768    /// batch
1769    ///     .value_counts()
1770    ///     .entries()
1771    ///     .all_ticks()
1772    /// # }, |mut stream| async move {
1773    /// // (1, 3), (2, 2)
1774    /// # assert_eq!(stream.next().await.unwrap(), (1, 3));
1775    /// # assert_eq!(stream.next().await.unwrap(), (2, 2));
1776    /// # }));
1777    /// # }
1778    /// ```
1779    pub fn value_counts(self) -> KeyedSingleton<K, usize, L, B::WhenValueUnbounded>
1780    where
1781        K: Eq + Hash,
1782    {
1783        self.assume_ordering_trusted(
1784            nondet!(/** ordering within each group affects neither result nor intermediates */),
1785        )
1786        .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1787    }
1788}
1789
1790impl<'a, K, V, L, B: Boundedness, R: Retries> KeyedStream<K, V, L, B, TotalOrder, R>
1791where
1792    L: Location<'a>,
1793{
1794    /// Like [`Stream::fold_idempotent`], aggregates the values in each group via the `comb` closure.
1795    ///
1796    /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
1797    ///
1798    /// If the input and output value types are the same and do not require initialization then use
1799    /// [`KeyedStream::reduce_idempotent`].
1800    ///
1801    /// # Example
1802    /// ```rust
1803    /// # #[cfg(feature = "deploy")] {
1804    /// # use hydro_lang::prelude::*;
1805    /// # use futures::StreamExt;
1806    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1807    /// let tick = process.tick();
1808    /// let numbers = process
1809    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1810    ///     .into_keyed();
1811    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1812    /// batch
1813    ///     .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1814    ///     .entries()
1815    ///     .all_ticks()
1816    /// # }, |mut stream| async move {
1817    /// // (1, false), (2, true)
1818    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1819    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1820    /// # }));
1821    /// # }
1822    /// ```
1823    pub fn fold_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1824        self,
1825        init: impl IntoQuotedMut<'a, I, L>,
1826        comb: impl IntoQuotedMut<'a, F, L>,
1827    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
1828    where
1829        K: Eq + Hash,
1830    {
1831        self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1832            .fold(init, comb)
1833    }
1834
1835    /// Like [`Stream::reduce_idempotent`], aggregates the values in each group via the `comb` closure.
1836    ///
1837    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1838    ///
1839    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_idempotent`].
1840    ///
1841    /// # Example
1842    /// ```rust
1843    /// # #[cfg(feature = "deploy")] {
1844    /// # use hydro_lang::prelude::*;
1845    /// # use futures::StreamExt;
1846    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1847    /// let tick = process.tick();
1848    /// let numbers = process
1849    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1850    ///     .into_keyed();
1851    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1852    /// batch
1853    ///     .reduce_idempotent(q!(|acc, x| *acc |= x))
1854    ///     .entries()
1855    ///     .all_ticks()
1856    /// # }, |mut stream| async move {
1857    /// // (1, false), (2, true)
1858    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1859    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1860    /// # }));
1861    /// # }
1862    /// ```
1863    pub fn reduce_idempotent<F: Fn(&mut V, V) + 'a>(
1864        self,
1865        comb: impl IntoQuotedMut<'a, F, L>,
1866    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1867    where
1868        K: Eq + Hash,
1869    {
1870        self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1871            .reduce(comb)
1872    }
1873
1874    /// A special case of [`KeyedStream::reduce_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1875    ///
1876    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1877    ///
1878    /// # Example
1879    /// ```rust
1880    /// # #[cfg(feature = "deploy")] {
1881    /// # use hydro_lang::prelude::*;
1882    /// # use futures::StreamExt;
1883    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1884    /// let tick = process.tick();
1885    /// let watermark = tick.singleton(q!(1));
1886    /// let numbers = process
1887    ///     .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1888    ///     .into_keyed();
1889    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1890    /// batch
1891    ///     .reduce_watermark_idempotent(watermark, q!(|acc, x| *acc |= x))
1892    ///     .entries()
1893    ///     .all_ticks()
1894    /// # }, |mut stream| async move {
1895    /// // (2, true)
1896    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1897    /// # }));
1898    /// # }
1899    /// ```
1900    pub fn reduce_watermark_idempotent<O2, F>(
1901        self,
1902        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1903        comb: impl IntoQuotedMut<'a, F, L>,
1904    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1905    where
1906        K: Eq + Hash,
1907        O2: Clone,
1908        F: Fn(&mut V, V) + 'a,
1909    {
1910        self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1911            .reduce_watermark(other, comb)
1912    }
1913}
1914
1915impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1916where
1917    L: Location<'a>,
1918{
1919    /// Like [`Stream::fold_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1920    ///
1921    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1922    /// as there may be non-deterministic duplicates.
1923    ///
1924    /// If the input and output value types are the same and do not require initialization then use
1925    /// [`KeyedStream::reduce_commutative_idempotent`].
1926    ///
1927    /// # Example
1928    /// ```rust
1929    /// # #[cfg(feature = "deploy")] {
1930    /// # use hydro_lang::prelude::*;
1931    /// # use futures::StreamExt;
1932    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1933    /// let tick = process.tick();
1934    /// let numbers = process
1935    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1936    ///     .into_keyed();
1937    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1938    /// batch
1939    ///     .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1940    ///     .entries()
1941    ///     .all_ticks()
1942    /// # }, |mut stream| async move {
1943    /// // (1, false), (2, true)
1944    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1945    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1946    /// # }));
1947    /// # }
1948    /// ```
1949    pub fn fold_commutative_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1950        self,
1951        init: impl IntoQuotedMut<'a, I, L>,
1952        comb: impl IntoQuotedMut<'a, F, L>,
1953    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
1954    where
1955        K: Eq + Hash,
1956    {
1957        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1958            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1959            .fold(init, comb)
1960    }
1961
1962    /// Like [`Stream::reduce_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1963    ///
1964    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1965    /// as there may be non-deterministic duplicates.
1966    ///
1967    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative_idempotent`].
1968    ///
1969    /// # Example
1970    /// ```rust
1971    /// # #[cfg(feature = "deploy")] {
1972    /// # use hydro_lang::prelude::*;
1973    /// # use futures::StreamExt;
1974    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1975    /// let tick = process.tick();
1976    /// let numbers = process
1977    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1978    ///     .into_keyed();
1979    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1980    /// batch
1981    ///     .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1982    ///     .entries()
1983    ///     .all_ticks()
1984    /// # }, |mut stream| async move {
1985    /// // (1, false), (2, true)
1986    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1987    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1988    /// # }));
1989    /// # }
1990    /// ```
1991    pub fn reduce_commutative_idempotent<F: Fn(&mut V, V) + 'a>(
1992        self,
1993        comb: impl IntoQuotedMut<'a, F, L>,
1994    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1995    where
1996        K: Eq + Hash,
1997    {
1998        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1999            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
2000            .reduce(comb)
2001    }
2002
2003    /// A special case of [`Stream::reduce_keyed_commutative_idempotent`] where tuples with keys less than the watermark are automatically deleted.
2004    ///
2005    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2006    /// as there may be non-deterministic duplicates.
2007    ///
2008    /// # Example
2009    /// ```rust
2010    /// # #[cfg(feature = "deploy")] {
2011    /// # use hydro_lang::prelude::*;
2012    /// # use futures::StreamExt;
2013    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2014    /// let tick = process.tick();
2015    /// let watermark = tick.singleton(q!(1));
2016    /// let numbers = process
2017    ///     .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
2018    ///     .into_keyed();
2019    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2020    /// batch
2021    ///     .reduce_watermark_commutative_idempotent(watermark, q!(|acc, x| *acc |= x))
2022    ///     .entries()
2023    ///     .all_ticks()
2024    /// # }, |mut stream| async move {
2025    /// // (2, true)
2026    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2027    /// # }));
2028    /// # }
2029    /// ```
2030    pub fn reduce_watermark_commutative_idempotent<O2, F>(
2031        self,
2032        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
2033        comb: impl IntoQuotedMut<'a, F, L>,
2034    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
2035    where
2036        K: Eq + Hash,
2037        O2: Clone,
2038        F: Fn(&mut V, V) + 'a,
2039    {
2040        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
2041            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
2042            .reduce_watermark(other, comb)
2043    }
2044
2045    /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
2046    /// whose keys are not in the bounded stream.
2047    ///
2048    /// # Example
2049    /// ```rust
2050    /// # #[cfg(feature = "deploy")] {
2051    /// # use hydro_lang::prelude::*;
2052    /// # use futures::StreamExt;
2053    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2054    /// let tick = process.tick();
2055    /// let keyed_stream = process
2056    ///     .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2057    ///     .batch(&tick, nondet!(/** test */))
2058    ///     .into_keyed();
2059    /// let keys_to_remove = process
2060    ///     .source_iter(q!(vec![1, 2]))
2061    ///     .batch(&tick, nondet!(/** test */));
2062    /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
2063    /// #   .entries()
2064    /// # }, |mut stream| async move {
2065    /// // { 3: ['c'], 4: ['d'] }
2066    /// # for w in vec![(3, 'c'), (4, 'd')] {
2067    /// #     assert_eq!(stream.next().await.unwrap(), w);
2068    /// # }
2069    /// # }));
2070    /// # }
2071    /// ```
2072    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
2073        self,
2074        other: Stream<K, L, Bounded, O2, R2>,
2075    ) -> Self
2076    where
2077        K: Eq + Hash,
2078    {
2079        check_matching_location(&self.location, &other.location);
2080
2081        KeyedStream::new(
2082            self.location.clone(),
2083            HydroNode::AntiJoin {
2084                pos: Box::new(self.ir_node.into_inner()),
2085                neg: Box::new(other.ir_node.into_inner()),
2086                metadata: self.location.new_node_metadata(Self::collection_kind()),
2087            },
2088        )
2089    }
2090}
2091
2092impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
2093where
2094    L: Location<'a>,
2095{
2096    /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
2097    /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
2098    ///
2099    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2100    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2101    /// argument that declares where the stream will be atomically processed. Batching a stream into
2102    /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2103    /// [`Tick`] will introduce asynchrony.
2104    pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
2105        let out_location = Atomic { tick: tick.clone() };
2106        KeyedStream::new(
2107            out_location.clone(),
2108            HydroNode::BeginAtomic {
2109                inner: Box::new(self.ir_node.into_inner()),
2110                metadata: out_location
2111                    .new_node_metadata(KeyedStream::<K, V, Atomic<L>, B, O, R>::collection_kind()),
2112            },
2113        )
2114    }
2115
2116    /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
2117    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2118    /// the order of the input.
2119    ///
2120    /// # Non-Determinism
2121    /// The batch boundaries are non-deterministic and may change across executions.
2122    pub fn batch(
2123        self,
2124        tick: &Tick<L>,
2125        nondet: NonDet,
2126    ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2127        let _ = nondet;
2128        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2129        KeyedStream::new(
2130            tick.clone(),
2131            HydroNode::Batch {
2132                inner: Box::new(self.ir_node.into_inner()),
2133                metadata: tick.new_node_metadata(
2134                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
2135                ),
2136            },
2137        )
2138    }
2139}
2140
2141impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
2142where
2143    L: Location<'a> + NoTick,
2144{
2145    /// Returns a keyed stream corresponding to the latest batch of elements being atomically
2146    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2147    /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
2148    /// used to create the atomic section.
2149    ///
2150    /// # Non-Determinism
2151    /// The batch boundaries are non-deterministic and may change across executions.
2152    pub fn batch_atomic(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2153        let _ = nondet;
2154        KeyedStream::new(
2155            self.location.clone().tick,
2156            HydroNode::Batch {
2157                inner: Box::new(self.ir_node.into_inner()),
2158                metadata: self.location.tick.new_node_metadata(KeyedStream::<
2159                    K,
2160                    V,
2161                    Tick<L>,
2162                    Bounded,
2163                    O,
2164                    R,
2165                >::collection_kind(
2166                )),
2167            },
2168        )
2169    }
2170
2171    /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
2172    /// See [`KeyedStream::atomic`] for more details.
2173    pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
2174        KeyedStream::new(
2175            self.location.tick.l.clone(),
2176            HydroNode::EndAtomic {
2177                inner: Box::new(self.ir_node.into_inner()),
2178                metadata: self
2179                    .location
2180                    .tick
2181                    .l
2182                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2183            },
2184        )
2185    }
2186}
2187
2188impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, L, Bounded, O, R>
2189where
2190    L: Location<'a>,
2191{
2192    /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
2193    /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
2194    /// is only present in one of the inputs, its values are passed through as-is). The output has
2195    /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
2196    ///
2197    /// Currently, both input streams must be [`Bounded`]. This operator will block
2198    /// on the first stream until all its elements are available. In a future version,
2199    /// we will relax the requirement on the `other` stream.
2200    ///
2201    /// # Example
2202    /// ```rust
2203    /// # #[cfg(feature = "deploy")] {
2204    /// # use hydro_lang::prelude::*;
2205    /// # use futures::StreamExt;
2206    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2207    /// let tick = process.tick();
2208    /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
2209    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2210    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2211    /// # .entries()
2212    /// # }, |mut stream| async move {
2213    /// // { 0: [2, 1], 1: [4, 3] }
2214    /// # for w in vec![(0, 2), (1, 4), (0, 1), (1, 3)] {
2215    /// #     assert_eq!(stream.next().await.unwrap(), w);
2216    /// # }
2217    /// # }));
2218    /// # }
2219    /// ```
2220    pub fn chain<O2: Ordering, R2: Retries>(
2221        self,
2222        other: KeyedStream<K, V, L, Bounded, O2, R2>,
2223    ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2224    where
2225        O: MinOrder<O2>,
2226        R: MinRetries<R2>,
2227    {
2228        check_matching_location(&self.location, &other.location);
2229
2230        KeyedStream::new(
2231            self.location.clone(),
2232            HydroNode::Chain {
2233                first: Box::new(self.ir_node.into_inner()),
2234                second: Box::new(other.ir_node.into_inner()),
2235                metadata: self.location.new_node_metadata(KeyedStream::<
2236                    K,
2237                    V,
2238                    L,
2239                    Bounded,
2240                    <O as MinOrder<O2>>::Min,
2241                    <R as MinRetries<R2>>::Min,
2242                >::collection_kind()),
2243            },
2244        )
2245    }
2246}
2247
2248impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
2249where
2250    L: Location<'a>,
2251{
2252    /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2253    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2254    /// each key.
2255    pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
2256        KeyedStream::new(
2257            self.location.outer().clone(),
2258            HydroNode::YieldConcat {
2259                inner: Box::new(self.ir_node.into_inner()),
2260                metadata: self.location.outer().new_node_metadata(KeyedStream::<
2261                    K,
2262                    V,
2263                    L,
2264                    Unbounded,
2265                    O,
2266                    R,
2267                >::collection_kind(
2268                )),
2269            },
2270        )
2271    }
2272
2273    /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2274    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2275    /// each key.
2276    ///
2277    /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
2278    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2279    /// stream's [`Tick`] context.
2280    pub fn all_ticks_atomic(self) -> KeyedStream<K, V, Atomic<L>, Unbounded, O, R> {
2281        let out_location = Atomic {
2282            tick: self.location.clone(),
2283        };
2284
2285        KeyedStream::new(
2286            out_location.clone(),
2287            HydroNode::YieldConcat {
2288                inner: Box::new(self.ir_node.into_inner()),
2289                metadata: out_location.new_node_metadata(KeyedStream::<
2290                    K,
2291                    V,
2292                    Atomic<L>,
2293                    Unbounded,
2294                    O,
2295                    R,
2296                >::collection_kind()),
2297            },
2298        )
2299    }
2300
2301    /// Shifts the entries in `self` to the **next tick**, so that the returned keyed stream at
2302    /// tick `T` always has the entries of `self` at tick `T - 1`.
2303    ///
2304    /// At tick `0`, the output keyed stream is empty, since there is no previous tick.
2305    ///
2306    /// This operator enables stateful iterative processing with ticks, by sending data from one
2307    /// tick to the next. For example, you can use it to combine inputs across consecutive batches.
2308    ///
2309    /// # Example
2310    /// ```rust
2311    /// # #[cfg(feature = "deploy")] {
2312    /// # use hydro_lang::prelude::*;
2313    /// # use futures::StreamExt;
2314    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2315    /// let tick = process.tick();
2316    /// # // ticks are lazy by default, forces the second tick to run
2317    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2318    /// # let batch_first_tick = process
2319    /// #   .source_iter(q!(vec![(1, 2), (1, 3)]))
2320    /// #   .batch(&tick, nondet!(/** test */))
2321    /// #   .into_keyed();
2322    /// # let batch_second_tick = process
2323    /// #   .source_iter(q!(vec![(1, 4), (2, 5)]))
2324    /// #   .batch(&tick, nondet!(/** test */))
2325    /// #   .defer_tick()
2326    /// #   .into_keyed(); // appears on the second tick
2327    /// let changes_across_ticks = // { 1: [2, 3] } (first tick), { 1: [4], 2: [5] } (second tick)
2328    /// # batch_first_tick.chain(batch_second_tick);
2329    /// changes_across_ticks.clone().defer_tick().chain( // from the previous tick
2330    ///     changes_across_ticks // from the current tick
2331    /// )
2332    /// # .entries().all_ticks()
2333    /// # }, |mut stream| async move {
2334    /// // { 1: [2, 3] } (first tick), { 1: [2, 3, 4], 2: [5] } (second tick), { 1: [4], 2: [5] } (third tick)
2335    /// # for w in vec![(1, 2), (1, 3), (1, 2), (1, 3), (1, 4), (2, 5), (1, 4), (2, 5)] {
2336    /// #     assert_eq!(stream.next().await.unwrap(), w);
2337    /// # }
2338    /// # }));
2339    /// # }
2340    /// ```
2341    pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2342        KeyedStream::new(
2343            self.location.clone(),
2344            HydroNode::DeferTick {
2345                input: Box::new(self.ir_node.into_inner()),
2346                metadata: self.location.new_node_metadata(KeyedStream::<
2347                    K,
2348                    V,
2349                    Tick<L>,
2350                    Bounded,
2351                    O,
2352                    R,
2353                >::collection_kind()),
2354            },
2355        )
2356    }
2357}
2358
2359#[cfg(test)]
2360mod tests {
2361    #[cfg(feature = "deploy")]
2362    use futures::{SinkExt, StreamExt};
2363    #[cfg(feature = "deploy")]
2364    use hydro_deploy::Deployment;
2365    #[cfg(any(feature = "deploy", feature = "sim"))]
2366    use stageleft::q;
2367
2368    #[cfg(any(feature = "deploy", feature = "sim"))]
2369    use crate::compile::builder::FlowBuilder;
2370    #[cfg(feature = "deploy")]
2371    use crate::live_collections::stream::ExactlyOnce;
2372    #[cfg(feature = "sim")]
2373    use crate::live_collections::stream::{NoOrder, TotalOrder};
2374    #[cfg(any(feature = "deploy", feature = "sim"))]
2375    use crate::location::Location;
2376    #[cfg(any(feature = "deploy", feature = "sim"))]
2377    use crate::nondet::nondet;
2378
2379    #[cfg(feature = "deploy")]
2380    #[tokio::test]
2381    async fn reduce_watermark_filter() {
2382        let mut deployment = Deployment::new();
2383
2384        let flow = FlowBuilder::new();
2385        let node = flow.process::<()>();
2386        let external = flow.external::<()>();
2387
2388        let node_tick = node.tick();
2389        let watermark = node_tick.singleton(q!(1));
2390
2391        let sum = node
2392            .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2393            .into_keyed()
2394            .reduce_watermark(
2395                watermark,
2396                q!(|acc, v| {
2397                    *acc += v;
2398                }),
2399            )
2400            .snapshot(&node_tick, nondet!(/** test */))
2401            .entries()
2402            .all_ticks()
2403            .send_bincode_external(&external);
2404
2405        let nodes = flow
2406            .with_process(&node, deployment.Localhost())
2407            .with_external(&external, deployment.Localhost())
2408            .deploy(&mut deployment);
2409
2410        deployment.deploy().await.unwrap();
2411
2412        let mut out = nodes.connect(sum).await;
2413
2414        deployment.start().await.unwrap();
2415
2416        assert_eq!(out.next().await.unwrap(), (2, 204));
2417    }
2418
2419    #[cfg(feature = "deploy")]
2420    #[tokio::test]
2421    async fn reduce_watermark_garbage_collect() {
2422        let mut deployment = Deployment::new();
2423
2424        let flow = FlowBuilder::new();
2425        let node = flow.process::<()>();
2426        let external = flow.external::<()>();
2427        let (tick_send, tick_trigger) =
2428            node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
2429
2430        let node_tick = node.tick();
2431        let (watermark_complete_cycle, watermark) =
2432            node_tick.cycle_with_initial(node_tick.singleton(q!(1)));
2433        let next_watermark = watermark.clone().map(q!(|v| v + 1));
2434        watermark_complete_cycle.complete_next_tick(next_watermark);
2435
2436        let tick_triggered_input = node
2437            .source_iter(q!([(3, 103)]))
2438            .batch(&node_tick, nondet!(/** test */))
2439            .filter_if_some(
2440                tick_trigger
2441                    .clone()
2442                    .batch(&node_tick, nondet!(/** test */))
2443                    .first(),
2444            )
2445            .all_ticks();
2446
2447        let sum = node
2448            .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2449            .interleave(tick_triggered_input)
2450            .into_keyed()
2451            .reduce_watermark_commutative(
2452                watermark,
2453                q!(|acc, v| {
2454                    *acc += v;
2455                }),
2456            )
2457            .snapshot(&node_tick, nondet!(/** test */))
2458            .entries()
2459            .all_ticks()
2460            .send_bincode_external(&external);
2461
2462        let nodes = flow
2463            .with_default_optimize()
2464            .with_process(&node, deployment.Localhost())
2465            .with_external(&external, deployment.Localhost())
2466            .deploy(&mut deployment);
2467
2468        deployment.deploy().await.unwrap();
2469
2470        let mut tick_send = nodes.connect(tick_send).await;
2471        let mut out_recv = nodes.connect(sum).await;
2472
2473        deployment.start().await.unwrap();
2474
2475        assert_eq!(out_recv.next().await.unwrap(), (2, 204));
2476
2477        tick_send.send(()).await.unwrap();
2478
2479        assert_eq!(out_recv.next().await.unwrap(), (3, 103));
2480    }
2481
2482    #[cfg(feature = "sim")]
2483    #[test]
2484    #[should_panic]
2485    fn sim_batch_nondet_size() {
2486        let flow = FlowBuilder::new();
2487        let node = flow.process::<()>();
2488
2489        let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2490
2491        let tick = node.tick();
2492        let out_recv = input
2493            .batch(&tick, nondet!(/** test */))
2494            .fold(q!(|| vec![]), q!(|acc, v| acc.push(v)))
2495            .entries()
2496            .all_ticks()
2497            .sim_output();
2498
2499        flow.sim().exhaustive(async || {
2500            out_recv
2501                .assert_yields_only_unordered([(1, vec![1, 2])])
2502                .await;
2503        });
2504    }
2505
2506    #[cfg(feature = "sim")]
2507    #[test]
2508    fn sim_batch_preserves_group_order() {
2509        let flow = FlowBuilder::new();
2510        let node = flow.process::<()>();
2511
2512        let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2513
2514        let tick = node.tick();
2515        let out_recv = input
2516            .batch(&tick, nondet!(/** test */))
2517            .all_ticks()
2518            .fold_early_stop(
2519                q!(|| 0),
2520                q!(|acc, v| {
2521                    *acc = std::cmp::max(v, *acc);
2522                    *acc >= 2
2523                }),
2524            )
2525            .entries()
2526            .sim_output();
2527
2528        let instances = flow.sim().exhaustive(async || {
2529            out_recv
2530                .assert_yields_only_unordered([(1, 2), (2, 3)])
2531                .await;
2532        });
2533
2534        assert_eq!(instances, 8);
2535        // - three cases: all three in a separate tick (pick where (2, 3) is)
2536        // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after
2537        // - two cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them
2538        // - one case: all three together
2539    }
2540
2541    #[cfg(feature = "sim")]
2542    #[test]
2543    fn sim_batch_unordered_shuffles() {
2544        let flow = FlowBuilder::new();
2545        let node = flow.process::<()>();
2546
2547        let input = node
2548            .source_iter(q!([(1, 1), (1, 2), (2, 3)]))
2549            .into_keyed()
2550            .weakest_ordering();
2551
2552        let tick = node.tick();
2553        let out_recv = input
2554            .batch(&tick, nondet!(/** test */))
2555            .all_ticks()
2556            .entries()
2557            .sim_output();
2558
2559        let instances = flow.sim().exhaustive(async || {
2560            out_recv
2561                .assert_yields_only_unordered([(1, 1), (1, 2), (2, 3)])
2562                .await;
2563        });
2564
2565        assert_eq!(instances, 13);
2566        // - 6 (3 * 2) cases: all three in a separate tick (pick where (2, 3) is), and order of (1, 1), (1, 2)
2567        // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2568        // - 4 (2 * 2) cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them, and order of (1, 1), (1, 2)
2569        // - one case: all three together (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2570    }
2571
2572    #[cfg(feature = "sim")]
2573    #[test]
2574    #[should_panic]
2575    fn sim_observe_order_batched() {
2576        let flow = FlowBuilder::new();
2577        let node = flow.process::<()>();
2578
2579        let (port, input) = node.sim_input::<_, NoOrder, _>();
2580
2581        let tick = node.tick();
2582        let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
2583        let out_recv = batch
2584            .assume_ordering::<TotalOrder>(nondet!(/** test */))
2585            .all_ticks()
2586            .first()
2587            .entries()
2588            .sim_output();
2589
2590        flow.sim().exhaustive(async || {
2591            port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
2592            out_recv
2593                .assert_yields_only_unordered([(1, 1), (2, 1)])
2594                .await; // fails with assume_ordering
2595        });
2596    }
2597
2598    #[cfg(feature = "sim")]
2599    #[test]
2600    fn sim_observe_order_batched_count() {
2601        let flow = FlowBuilder::new();
2602        let node = flow.process::<()>();
2603
2604        let (port, input) = node.sim_input::<_, NoOrder, _>();
2605
2606        let tick = node.tick();
2607        let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
2608        let out_recv = batch
2609            .assume_ordering::<TotalOrder>(nondet!(/** test */))
2610            .all_ticks()
2611            .entries()
2612            .sim_output();
2613
2614        let instance_count = flow.sim().exhaustive(async || {
2615            port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
2616            let _ = out_recv.collect_sorted::<Vec<_>>().await;
2617        });
2618
2619        assert_eq!(instance_count, 104); // too complicated to enumerate here, but less than stream equivalent
2620    }
2621}