hydro_lang/live_collections/keyed_stream/
mod.rs

1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
11
12use super::boundedness::{Bounded, Boundedness, Unbounded};
13use super::keyed_singleton::KeyedSingleton;
14use super::optional::Optional;
15use super::stream::{ExactlyOnce, MinOrder, MinRetries, NoOrder, Stream, TotalOrder};
16use crate::compile::ir::{
17    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
18};
19#[cfg(stageleft_runtime)]
20use crate::forward_handle::{CycleCollection, ReceiverComplete};
21use crate::forward_handle::{ForwardRef, TickCycle};
22use crate::live_collections::batch_atomic::BatchAtomic;
23use crate::live_collections::stream::{AtLeastOnce, Ordering, Retries};
24#[cfg(stageleft_runtime)]
25use crate::location::dynamic::{DynLocation, LocationId};
26use crate::location::tick::DeferTick;
27use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
28use crate::manual_expr::ManualExpr;
29use crate::nondet::{NonDet, nondet};
30use crate::properties::{AggFuncAlgebra, ValidCommutativityFor, ValidIdempotenceFor};
31
32pub mod networking;
33
34/// Streaming elements of type `V` grouped by a key of type `K`.
35///
36/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
37/// order of keys is non-deterministic but the order *within* each group may be deterministic.
38///
39/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
40/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
41/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
42///
43/// Type Parameters:
44/// - `K`: the type of the key for each group
45/// - `V`: the type of the elements inside each group
46/// - `Loc`: the [`Location`] where the keyed stream is materialized
47/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
48/// - `Order`: tracks whether the elements within each group have deterministic order
49///   ([`TotalOrder`]) or not ([`NoOrder`])
50/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
51///   ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
52pub struct KeyedStream<
53    K,
54    V,
55    Loc,
56    Bound: Boundedness = Unbounded,
57    Order: Ordering = TotalOrder,
58    Retry: Retries = ExactlyOnce,
59> {
60    pub(crate) location: Loc,
61    pub(crate) ir_node: RefCell<HydroNode>,
62
63    _phantom: PhantomData<(K, V, Loc, Bound, Order, Retry)>,
64}
65
66impl<'a, K, V, L, O: Ordering, R: Retries> From<KeyedStream<K, V, L, Bounded, O, R>>
67    for KeyedStream<K, V, L, Unbounded, O, R>
68where
69    L: Location<'a>,
70{
71    fn from(stream: KeyedStream<K, V, L, Bounded, O, R>) -> KeyedStream<K, V, L, Unbounded, O, R> {
72        let new_meta = stream
73            .location
74            .new_node_metadata(KeyedStream::<K, V, L, Unbounded, O, R>::collection_kind());
75
76        KeyedStream {
77            location: stream.location,
78            ir_node: RefCell::new(HydroNode::Cast {
79                inner: Box::new(stream.ir_node.into_inner()),
80                metadata: new_meta,
81            }),
82            _phantom: PhantomData,
83        }
84    }
85}
86
87impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
88    for KeyedStream<K, V, L, B, NoOrder, R>
89where
90    L: Location<'a>,
91{
92    fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
93        stream.weakest_ordering()
94    }
95}
96
97impl<'a, K, V, L, O: Ordering, R: Retries> DeferTick for KeyedStream<K, V, Tick<L>, Bounded, O, R>
98where
99    L: Location<'a>,
100{
101    fn defer_tick(self) -> Self {
102        KeyedStream::defer_tick(self)
103    }
104}
105
106impl<'a, K, V, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
107    for KeyedStream<K, V, Tick<L>, Bounded, O, R>
108where
109    L: Location<'a>,
110{
111    type Location = Tick<L>;
112
113    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
114        KeyedStream {
115            location: location.clone(),
116            ir_node: RefCell::new(HydroNode::CycleSource {
117                ident,
118                metadata: location.new_node_metadata(
119                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
120                ),
121            }),
122            _phantom: PhantomData,
123        }
124    }
125}
126
127impl<'a, K, V, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
128    for KeyedStream<K, V, Tick<L>, Bounded, O, R>
129where
130    L: Location<'a>,
131{
132    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
133        assert_eq!(
134            Location::id(&self.location),
135            expected_location,
136            "locations do not match"
137        );
138
139        self.location
140            .flow_state()
141            .borrow_mut()
142            .push_root(HydroRoot::CycleSink {
143                ident,
144                input: Box::new(self.ir_node.into_inner()),
145                op_metadata: HydroIrOpMetadata::new(),
146            });
147    }
148}
149
150impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
151    for KeyedStream<K, V, L, B, O, R>
152where
153    L: Location<'a> + NoTick,
154{
155    type Location = L;
156
157    fn create_source(ident: syn::Ident, location: L) -> Self {
158        KeyedStream {
159            location: location.clone(),
160            ir_node: RefCell::new(HydroNode::CycleSource {
161                ident,
162                metadata: location
163                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
164            }),
165            _phantom: PhantomData,
166        }
167    }
168}
169
170impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
171    for KeyedStream<K, V, L, B, O, R>
172where
173    L: Location<'a> + NoTick,
174{
175    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
176        assert_eq!(
177            Location::id(&self.location),
178            expected_location,
179            "locations do not match"
180        );
181        self.location
182            .flow_state()
183            .borrow_mut()
184            .push_root(HydroRoot::CycleSink {
185                ident,
186                input: Box::new(self.ir_node.into_inner()),
187                op_metadata: HydroIrOpMetadata::new(),
188            });
189    }
190}
191
192impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
193    Clone for KeyedStream<K, V, Loc, Bound, Order, R>
194{
195    fn clone(&self) -> Self {
196        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
197            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
198            *self.ir_node.borrow_mut() = HydroNode::Tee {
199                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
200                metadata: self.location.new_node_metadata(Self::collection_kind()),
201            };
202        }
203
204        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
205            KeyedStream {
206                location: self.location.clone(),
207                ir_node: HydroNode::Tee {
208                    inner: TeeNode(inner.0.clone()),
209                    metadata: metadata.clone(),
210                }
211                .into(),
212                _phantom: PhantomData,
213            }
214        } else {
215            unreachable!()
216        }
217    }
218}
219
220impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
221    KeyedStream<K, V, L, B, O, R>
222{
223    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
224        debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
225        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
226
227        KeyedStream {
228            location,
229            ir_node: RefCell::new(ir_node),
230            _phantom: PhantomData,
231        }
232    }
233
234    /// Returns the [`CollectionKind`] corresponding to this type.
235    pub fn collection_kind() -> CollectionKind {
236        CollectionKind::KeyedStream {
237            bound: B::BOUND_KIND,
238            value_order: O::ORDERING_KIND,
239            value_retry: R::RETRIES_KIND,
240            key_type: stageleft::quote_type::<K>().into(),
241            value_type: stageleft::quote_type::<V>().into(),
242        }
243    }
244
245    /// Returns the [`Location`] where this keyed stream is being materialized.
246    pub fn location(&self) -> &L {
247        &self.location
248    }
249
250    /// Explicitly "casts" the keyed stream to a type with a different ordering
251    /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
252    /// by the type-system.
253    ///
254    /// # Non-Determinism
255    /// This function is used as an escape hatch, and any mistakes in the
256    /// provided ordering guarantee will propagate into the guarantees
257    /// for the rest of the program.
258    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
259        if O::ORDERING_KIND == O2::ORDERING_KIND {
260            KeyedStream::new(self.location, self.ir_node.into_inner())
261        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
262            // We can always weaken the ordering guarantee
263            KeyedStream::new(
264                self.location.clone(),
265                HydroNode::Cast {
266                    inner: Box::new(self.ir_node.into_inner()),
267                    metadata: self
268                        .location
269                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
270                },
271            )
272        } else {
273            KeyedStream::new(
274                self.location.clone(),
275                HydroNode::ObserveNonDet {
276                    inner: Box::new(self.ir_node.into_inner()),
277                    trusted: false,
278                    metadata: self
279                        .location
280                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
281                },
282            )
283        }
284    }
285
286    fn assume_ordering_trusted<O2: Ordering>(
287        self,
288        _nondet: NonDet,
289    ) -> KeyedStream<K, V, L, B, O2, R> {
290        if O::ORDERING_KIND == O2::ORDERING_KIND {
291            KeyedStream::new(self.location, self.ir_node.into_inner())
292        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
293            // We can always weaken the ordering guarantee
294            KeyedStream::new(
295                self.location.clone(),
296                HydroNode::Cast {
297                    inner: Box::new(self.ir_node.into_inner()),
298                    metadata: self
299                        .location
300                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
301                },
302            )
303        } else {
304            KeyedStream::new(
305                self.location.clone(),
306                HydroNode::ObserveNonDet {
307                    inner: Box::new(self.ir_node.into_inner()),
308                    trusted: true,
309                    metadata: self
310                        .location
311                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
312                },
313            )
314        }
315    }
316
317    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
318    /// which is always safe because that is the weakest possible guarantee.
319    pub fn weakest_ordering(self) -> KeyedStream<K, V, L, B, NoOrder, R> {
320        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
321        self.assume_ordering::<NoOrder>(nondet)
322    }
323
324    /// Explicitly "casts" the keyed stream to a type with a different retries
325    /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
326    /// be proven by the type-system.
327    ///
328    /// # Non-Determinism
329    /// This function is used as an escape hatch, and any mistakes in the
330    /// provided retries guarantee will propagate into the guarantees
331    /// for the rest of the program.
332    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
333        if R::RETRIES_KIND == R2::RETRIES_KIND {
334            KeyedStream::new(self.location, self.ir_node.into_inner())
335        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
336            // We can always weaken the retries guarantee
337            KeyedStream::new(
338                self.location.clone(),
339                HydroNode::Cast {
340                    inner: Box::new(self.ir_node.into_inner()),
341                    metadata: self
342                        .location
343                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
344                },
345            )
346        } else {
347            KeyedStream::new(
348                self.location.clone(),
349                HydroNode::ObserveNonDet {
350                    inner: Box::new(self.ir_node.into_inner()),
351                    trusted: false,
352                    metadata: self
353                        .location
354                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
355                },
356            )
357        }
358    }
359
360    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
361    /// which is always safe because that is the weakest possible guarantee.
362    pub fn weakest_retries(self) -> KeyedStream<K, V, L, B, O, AtLeastOnce> {
363        let nondet = nondet!(/** this is a weaker retries guarantee, so it is safe to assume */);
364        self.assume_retries::<AtLeastOnce>(nondet)
365    }
366
367    /// Flattens the keyed stream into an unordered stream of key-value pairs.
368    ///
369    /// # Example
370    /// ```rust
371    /// # #[cfg(feature = "deploy")] {
372    /// # use hydro_lang::prelude::*;
373    /// # use futures::StreamExt;
374    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
375    /// process
376    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
377    ///     .into_keyed()
378    ///     .entries()
379    /// # }, |mut stream| async move {
380    /// // (1, 2), (1, 3), (2, 4) in any order
381    /// # let mut results = Vec::new();
382    /// # for _ in 0..3 {
383    /// #     results.push(stream.next().await.unwrap());
384    /// # }
385    /// # results.sort();
386    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
387    /// # }));
388    /// # }
389    /// ```
390    pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
391        Stream::new(
392            self.location.clone(),
393            HydroNode::Cast {
394                inner: Box::new(self.ir_node.into_inner()),
395                metadata: self
396                    .location
397                    .new_node_metadata(Stream::<(K, V), L, B, NoOrder, R>::collection_kind()),
398            },
399        )
400    }
401
402    /// Flattens the keyed stream into an unordered stream of only the values.
403    ///
404    /// # Example
405    /// ```rust
406    /// # #[cfg(feature = "deploy")] {
407    /// # use hydro_lang::prelude::*;
408    /// # use futures::StreamExt;
409    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
410    /// process
411    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
412    ///     .into_keyed()
413    ///     .values()
414    /// # }, |mut stream| async move {
415    /// // 2, 3, 4 in any order
416    /// # let mut results = Vec::new();
417    /// # for _ in 0..3 {
418    /// #     results.push(stream.next().await.unwrap());
419    /// # }
420    /// # results.sort();
421    /// # assert_eq!(results, vec![2, 3, 4]);
422    /// # }));
423    /// # }
424    /// ```
425    pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
426        self.entries().map(q!(|(_, v)| v))
427    }
428
429    /// Transforms each value by invoking `f` on each element, with keys staying the same
430    /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
431    ///
432    /// If you do not want to modify the stream and instead only want to view
433    /// each item use [`KeyedStream::inspect`] instead.
434    ///
435    /// # Example
436    /// ```rust
437    /// # #[cfg(feature = "deploy")] {
438    /// # use hydro_lang::prelude::*;
439    /// # use futures::StreamExt;
440    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
441    /// process
442    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
443    ///     .into_keyed()
444    ///     .map(q!(|v| v + 1))
445    /// #   .entries()
446    /// # }, |mut stream| async move {
447    /// // { 1: [3, 4], 2: [5] }
448    /// # let mut results = Vec::new();
449    /// # for _ in 0..3 {
450    /// #     results.push(stream.next().await.unwrap());
451    /// # }
452    /// # results.sort();
453    /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 5)]);
454    /// # }));
455    /// # }
456    /// ```
457    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
458    where
459        F: Fn(V) -> U + 'a,
460    {
461        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
462        let map_f = q!({
463            let orig = f;
464            move |(k, v)| (k, orig(v))
465        })
466        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
467        .into();
468
469        KeyedStream::new(
470            self.location.clone(),
471            HydroNode::Map {
472                f: map_f,
473                input: Box::new(self.ir_node.into_inner()),
474                metadata: self
475                    .location
476                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
477            },
478        )
479    }
480
481    /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
482    /// re-grouped even they are tuples; instead they will be grouped under the original key.
483    ///
484    /// If you do not want to modify the stream and instead only want to view
485    /// each item use [`KeyedStream::inspect_with_key`] instead.
486    ///
487    /// # Example
488    /// ```rust
489    /// # #[cfg(feature = "deploy")] {
490    /// # use hydro_lang::prelude::*;
491    /// # use futures::StreamExt;
492    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
493    /// process
494    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
495    ///     .into_keyed()
496    ///     .map_with_key(q!(|(k, v)| k + v))
497    /// #   .entries()
498    /// # }, |mut stream| async move {
499    /// // { 1: [3, 4], 2: [6] }
500    /// # let mut results = Vec::new();
501    /// # for _ in 0..3 {
502    /// #     results.push(stream.next().await.unwrap());
503    /// # }
504    /// # results.sort();
505    /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 6)]);
506    /// # }));
507    /// # }
508    /// ```
509    pub fn map_with_key<U, F>(
510        self,
511        f: impl IntoQuotedMut<'a, F, L> + Copy,
512    ) -> KeyedStream<K, U, L, B, O, R>
513    where
514        F: Fn((K, V)) -> U + 'a,
515        K: Clone,
516    {
517        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
518        let map_f = q!({
519            let orig = f;
520            move |(k, v)| {
521                let out = orig((Clone::clone(&k), v));
522                (k, out)
523            }
524        })
525        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
526        .into();
527
528        KeyedStream::new(
529            self.location.clone(),
530            HydroNode::Map {
531                f: map_f,
532                input: Box::new(self.ir_node.into_inner()),
533                metadata: self
534                    .location
535                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
536            },
537        )
538    }
539
540    /// Prepends a new value to the key of each element in the stream, producing a new
541    /// keyed stream with compound keys. Because the original key is preserved, no re-grouping
542    /// occurs and the elements in each group preserve their original order.
543    ///
544    /// # Example
545    /// ```rust
546    /// # #[cfg(feature = "deploy")] {
547    /// # use hydro_lang::prelude::*;
548    /// # use futures::StreamExt;
549    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
550    /// process
551    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
552    ///     .into_keyed()
553    ///     .prefix_key(q!(|&(k, _)| k % 2))
554    /// #   .entries()
555    /// # }, |mut stream| async move {
556    /// // { (1, 1): [2, 3], (0, 2): [4] }
557    /// # let mut results = Vec::new();
558    /// # for _ in 0..3 {
559    /// #     results.push(stream.next().await.unwrap());
560    /// # }
561    /// # results.sort();
562    /// # assert_eq!(results, vec![((0, 2), 4), ((1, 1), 2), ((1, 1), 3)]);
563    /// # }));
564    /// # }
565    /// ```
566    pub fn prefix_key<K2, F>(
567        self,
568        f: impl IntoQuotedMut<'a, F, L> + Copy,
569    ) -> KeyedStream<(K2, K), V, L, B, O, R>
570    where
571        F: Fn(&(K, V)) -> K2 + 'a,
572    {
573        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
574        let map_f = q!({
575            let orig = f;
576            move |kv| {
577                let out = orig(&kv);
578                ((out, kv.0), kv.1)
579            }
580        })
581        .splice_fn1_ctx::<(K, V), ((K2, K), V)>(&self.location)
582        .into();
583
584        KeyedStream::new(
585            self.location.clone(),
586            HydroNode::Map {
587                f: map_f,
588                input: Box::new(self.ir_node.into_inner()),
589                metadata: self
590                    .location
591                    .new_node_metadata(KeyedStream::<(K2, K), V, L, B, O, R>::collection_kind()),
592            },
593        )
594    }
595
596    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
597    /// `f`, preserving the order of the elements within the group.
598    ///
599    /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
600    /// not modify or take ownership of the values. If you need to modify the values while filtering
601    /// use [`KeyedStream::filter_map`] instead.
602    ///
603    /// # Example
604    /// ```rust
605    /// # #[cfg(feature = "deploy")] {
606    /// # use hydro_lang::prelude::*;
607    /// # use futures::StreamExt;
608    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
609    /// process
610    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
611    ///     .into_keyed()
612    ///     .filter(q!(|&x| x > 2))
613    /// #   .entries()
614    /// # }, |mut stream| async move {
615    /// // { 1: [3], 2: [4] }
616    /// # let mut results = Vec::new();
617    /// # for _ in 0..2 {
618    /// #     results.push(stream.next().await.unwrap());
619    /// # }
620    /// # results.sort();
621    /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
622    /// # }));
623    /// # }
624    /// ```
625    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
626    where
627        F: Fn(&V) -> bool + 'a,
628    {
629        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
630        let filter_f = q!({
631            let orig = f;
632            move |t: &(_, _)| orig(&t.1)
633        })
634        .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
635        .into();
636
637        KeyedStream::new(
638            self.location.clone(),
639            HydroNode::Filter {
640                f: filter_f,
641                input: Box::new(self.ir_node.into_inner()),
642                metadata: self.location.new_node_metadata(Self::collection_kind()),
643            },
644        )
645    }
646
647    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
648    /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
649    ///
650    /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
651    /// not modify or take ownership of the values. If you need to modify the values while filtering
652    /// use [`KeyedStream::filter_map_with_key`] instead.
653    ///
654    /// # Example
655    /// ```rust
656    /// # #[cfg(feature = "deploy")] {
657    /// # use hydro_lang::prelude::*;
658    /// # use futures::StreamExt;
659    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
660    /// process
661    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
662    ///     .into_keyed()
663    ///     .filter_with_key(q!(|&(k, v)| v - k == 2))
664    /// #   .entries()
665    /// # }, |mut stream| async move {
666    /// // { 1: [3], 2: [4] }
667    /// # let mut results = Vec::new();
668    /// # for _ in 0..2 {
669    /// #     results.push(stream.next().await.unwrap());
670    /// # }
671    /// # results.sort();
672    /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
673    /// # }));
674    /// # }
675    /// ```
676    pub fn filter_with_key<F>(
677        self,
678        f: impl IntoQuotedMut<'a, F, L> + Copy,
679    ) -> KeyedStream<K, V, L, B, O, R>
680    where
681        F: Fn(&(K, V)) -> bool + 'a,
682    {
683        let filter_f = f
684            .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
685            .into();
686
687        KeyedStream::new(
688            self.location.clone(),
689            HydroNode::Filter {
690                f: filter_f,
691                input: Box::new(self.ir_node.into_inner()),
692                metadata: self.location.new_node_metadata(Self::collection_kind()),
693            },
694        )
695    }
696
697    /// An operator that both filters and maps each value, with keys staying the same.
698    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
699    /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
700    ///
701    /// # Example
702    /// ```rust
703    /// # #[cfg(feature = "deploy")] {
704    /// # use hydro_lang::prelude::*;
705    /// # use futures::StreamExt;
706    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
707    /// process
708    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
709    ///     .into_keyed()
710    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
711    /// #   .entries()
712    /// # }, |mut stream| async move {
713    /// // { 1: [2], 2: [4] }
714    /// # let mut results = Vec::new();
715    /// # for _ in 0..2 {
716    /// #     results.push(stream.next().await.unwrap());
717    /// # }
718    /// # results.sort();
719    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
720    /// # }));
721    /// # }
722    /// ```
723    pub fn filter_map<U, F>(
724        self,
725        f: impl IntoQuotedMut<'a, F, L> + Copy,
726    ) -> KeyedStream<K, U, L, B, O, R>
727    where
728        F: Fn(V) -> Option<U> + 'a,
729    {
730        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
731        let filter_map_f = q!({
732            let orig = f;
733            move |(k, v)| orig(v).map(|o| (k, o))
734        })
735        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
736        .into();
737
738        KeyedStream::new(
739            self.location.clone(),
740            HydroNode::FilterMap {
741                f: filter_map_f,
742                input: Box::new(self.ir_node.into_inner()),
743                metadata: self
744                    .location
745                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
746            },
747        )
748    }
749
750    /// An operator that both filters and maps each key-value pair. The resulting values are **not**
751    /// re-grouped even they are tuples; instead they will be grouped under the original key.
752    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
753    ///
754    /// # Example
755    /// ```rust
756    /// # #[cfg(feature = "deploy")] {
757    /// # use hydro_lang::prelude::*;
758    /// # use futures::StreamExt;
759    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
760    /// process
761    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
762    ///     .into_keyed()
763    ///     .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
764    /// #   .entries()
765    /// # }, |mut stream| async move {
766    /// // { 2: [2] }
767    /// # let mut results = Vec::new();
768    /// # for _ in 0..1 {
769    /// #     results.push(stream.next().await.unwrap());
770    /// # }
771    /// # results.sort();
772    /// # assert_eq!(results, vec![(2, 2)]);
773    /// # }));
774    /// # }
775    /// ```
776    pub fn filter_map_with_key<U, F>(
777        self,
778        f: impl IntoQuotedMut<'a, F, L> + Copy,
779    ) -> KeyedStream<K, U, L, B, O, R>
780    where
781        F: Fn((K, V)) -> Option<U> + 'a,
782        K: Clone,
783    {
784        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
785        let filter_map_f = q!({
786            let orig = f;
787            move |(k, v)| {
788                let out = orig((Clone::clone(&k), v));
789                out.map(|o| (k, o))
790            }
791        })
792        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
793        .into();
794
795        KeyedStream::new(
796            self.location.clone(),
797            HydroNode::FilterMap {
798                f: filter_map_f,
799                input: Box::new(self.ir_node.into_inner()),
800                metadata: self
801                    .location
802                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
803            },
804        )
805    }
806
807    /// Generates a keyed stream that maps each value `v` to a tuple `(v, x)`,
808    /// where `v` is the value of `other`, a bounded [`super::singleton::Singleton`] or
809    /// [`Optional`]. If `other` is an empty [`Optional`], no values will be produced.
810    ///
811    /// # Example
812    /// ```rust
813    /// # #[cfg(feature = "deploy")] {
814    /// # use hydro_lang::prelude::*;
815    /// # use futures::StreamExt;
816    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
817    /// let tick = process.tick();
818    /// let batch = process
819    ///   .source_iter(q!(vec![(1, 123), (1, 456), (2, 123)]))
820    ///   .into_keyed()
821    ///   .batch(&tick, nondet!(/** test */));
822    /// let count = batch.clone().entries().count(); // `count()` returns a singleton
823    /// batch.cross_singleton(count).all_ticks().entries()
824    /// # }, |mut stream| async move {
825    /// // { 1: [(123, 3), (456, 3)], 2: [(123, 3)] }
826    /// # let mut results = Vec::new();
827    /// # for _ in 0..3 {
828    /// #     results.push(stream.next().await.unwrap());
829    /// # }
830    /// # results.sort();
831    /// # assert_eq!(results, vec![(1, (123, 3)), (1, (456, 3)), (2, (123, 3))]);
832    /// # }));
833    /// # }
834    /// ```
835    pub fn cross_singleton<O2>(
836        self,
837        other: impl Into<Optional<O2, L, Bounded>>,
838    ) -> KeyedStream<K, (V, O2), L, B, O, R>
839    where
840        O2: Clone,
841    {
842        let other: Optional<O2, L, Bounded> = other.into();
843        check_matching_location(&self.location, &other.location);
844
845        Stream::new(
846            self.location.clone(),
847            HydroNode::CrossSingleton {
848                left: Box::new(self.ir_node.into_inner()),
849                right: Box::new(other.ir_node.into_inner()),
850                metadata: self
851                    .location
852                    .new_node_metadata(Stream::<((K, V), O2), L, B, O, R>::collection_kind()),
853            },
854        )
855        .map(q!(|((k, v), o2)| (k, (v, o2))))
856        .into_keyed()
857    }
858
859    /// For each value `v` in each group, transform `v` using `f` and then treat the
860    /// result as an [`Iterator`] to produce values one by one within the same group.
861    /// The implementation for [`Iterator`] for the output type `I` must produce items
862    /// in a **deterministic** order.
863    ///
864    /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
865    /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
866    ///
867    /// # Example
868    /// ```rust
869    /// # #[cfg(feature = "deploy")] {
870    /// # use hydro_lang::prelude::*;
871    /// # use futures::StreamExt;
872    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
873    /// process
874    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
875    ///     .into_keyed()
876    ///     .flat_map_ordered(q!(|x| x))
877    /// #   .entries()
878    /// # }, |mut stream| async move {
879    /// // { 1: [2, 3, 4], 2: [5, 6] }
880    /// # let mut results = Vec::new();
881    /// # for _ in 0..5 {
882    /// #     results.push(stream.next().await.unwrap());
883    /// # }
884    /// # results.sort();
885    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
886    /// # }));
887    /// # }
888    /// ```
889    pub fn flat_map_ordered<U, I, F>(
890        self,
891        f: impl IntoQuotedMut<'a, F, L> + Copy,
892    ) -> KeyedStream<K, U, L, B, O, R>
893    where
894        I: IntoIterator<Item = U>,
895        F: Fn(V) -> I + 'a,
896        K: Clone,
897    {
898        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
899        let flat_map_f = q!({
900            let orig = f;
901            move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
902        })
903        .splice_fn1_ctx::<(K, V), _>(&self.location)
904        .into();
905
906        KeyedStream::new(
907            self.location.clone(),
908            HydroNode::FlatMap {
909                f: flat_map_f,
910                input: Box::new(self.ir_node.into_inner()),
911                metadata: self
912                    .location
913                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
914            },
915        )
916    }
917
918    /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
919    /// for the output type `I` to produce items in any order.
920    ///
921    /// # Example
922    /// ```rust
923    /// # #[cfg(feature = "deploy")] {
924    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
925    /// # use futures::StreamExt;
926    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
927    /// process
928    ///     .source_iter(q!(vec![
929    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
930    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
931    ///     ]))
932    ///     .into_keyed()
933    ///     .flat_map_unordered(q!(|x| x))
934    /// #   .entries()
935    /// # }, |mut stream| async move {
936    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
937    /// # let mut results = Vec::new();
938    /// # for _ in 0..4 {
939    /// #     results.push(stream.next().await.unwrap());
940    /// # }
941    /// # results.sort();
942    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
943    /// # }));
944    /// # }
945    /// ```
946    pub fn flat_map_unordered<U, I, F>(
947        self,
948        f: impl IntoQuotedMut<'a, F, L> + Copy,
949    ) -> KeyedStream<K, U, L, B, NoOrder, R>
950    where
951        I: IntoIterator<Item = U>,
952        F: Fn(V) -> I + 'a,
953        K: Clone,
954    {
955        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
956        let flat_map_f = q!({
957            let orig = f;
958            move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
959        })
960        .splice_fn1_ctx::<(K, V), _>(&self.location)
961        .into();
962
963        KeyedStream::new(
964            self.location.clone(),
965            HydroNode::FlatMap {
966                f: flat_map_f,
967                input: Box::new(self.ir_node.into_inner()),
968                metadata: self
969                    .location
970                    .new_node_metadata(KeyedStream::<K, U, L, B, NoOrder, R>::collection_kind()),
971            },
972        )
973    }
974
975    /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
976    /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
977    /// items in a **deterministic** order.
978    ///
979    /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
980    /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
981    ///
982    /// # Example
983    /// ```rust
984    /// # #[cfg(feature = "deploy")] {
985    /// # use hydro_lang::prelude::*;
986    /// # use futures::StreamExt;
987    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
988    /// process
989    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
990    ///     .into_keyed()
991    ///     .flatten_ordered()
992    /// #   .entries()
993    /// # }, |mut stream| async move {
994    /// // { 1: [2, 3, 4], 2: [5, 6] }
995    /// # let mut results = Vec::new();
996    /// # for _ in 0..5 {
997    /// #     results.push(stream.next().await.unwrap());
998    /// # }
999    /// # results.sort();
1000    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1001    /// # }));
1002    /// # }
1003    /// ```
1004    pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
1005    where
1006        V: IntoIterator<Item = U>,
1007        K: Clone,
1008    {
1009        self.flat_map_ordered(q!(|d| d))
1010    }
1011
1012    /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
1013    /// for the value type `V` to produce items in any order.
1014    ///
1015    /// # Example
1016    /// ```rust
1017    /// # #[cfg(feature = "deploy")] {
1018    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1019    /// # use futures::StreamExt;
1020    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1021    /// process
1022    ///     .source_iter(q!(vec![
1023    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1024    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
1025    ///     ]))
1026    ///     .into_keyed()
1027    ///     .flatten_unordered()
1028    /// #   .entries()
1029    /// # }, |mut stream| async move {
1030    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1031    /// # let mut results = Vec::new();
1032    /// # for _ in 0..4 {
1033    /// #     results.push(stream.next().await.unwrap());
1034    /// # }
1035    /// # results.sort();
1036    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1037    /// # }));
1038    /// # }
1039    /// ```
1040    pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
1041    where
1042        V: IntoIterator<Item = U>,
1043        K: Clone,
1044    {
1045        self.flat_map_unordered(q!(|d| d))
1046    }
1047
1048    /// An operator which allows you to "inspect" each element of a stream without
1049    /// modifying it. The closure `f` is called on a reference to each value. This is
1050    /// mainly useful for debugging, and should not be used to generate side-effects.
1051    ///
1052    /// # Example
1053    /// ```rust
1054    /// # #[cfg(feature = "deploy")] {
1055    /// # use hydro_lang::prelude::*;
1056    /// # use futures::StreamExt;
1057    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1058    /// process
1059    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1060    ///     .into_keyed()
1061    ///     .inspect(q!(|v| println!("{}", v)))
1062    /// #   .entries()
1063    /// # }, |mut stream| async move {
1064    /// # let mut results = Vec::new();
1065    /// # for _ in 0..3 {
1066    /// #     results.push(stream.next().await.unwrap());
1067    /// # }
1068    /// # results.sort();
1069    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1070    /// # }));
1071    /// # }
1072    /// ```
1073    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1074    where
1075        F: Fn(&V) + 'a,
1076    {
1077        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1078        let inspect_f = q!({
1079            let orig = f;
1080            move |t: &(_, _)| orig(&t.1)
1081        })
1082        .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1083        .into();
1084
1085        KeyedStream::new(
1086            self.location.clone(),
1087            HydroNode::Inspect {
1088                f: inspect_f,
1089                input: Box::new(self.ir_node.into_inner()),
1090                metadata: self.location.new_node_metadata(Self::collection_kind()),
1091            },
1092        )
1093    }
1094
1095    /// An operator which allows you to "inspect" each element of a stream without
1096    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1097    /// mainly useful for debugging, and should not be used to generate side-effects.
1098    ///
1099    /// # Example
1100    /// ```rust
1101    /// # #[cfg(feature = "deploy")] {
1102    /// # use hydro_lang::prelude::*;
1103    /// # use futures::StreamExt;
1104    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1105    /// process
1106    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1107    ///     .into_keyed()
1108    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1109    /// #   .entries()
1110    /// # }, |mut stream| async move {
1111    /// # let mut results = Vec::new();
1112    /// # for _ in 0..3 {
1113    /// #     results.push(stream.next().await.unwrap());
1114    /// # }
1115    /// # results.sort();
1116    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1117    /// # }));
1118    /// # }
1119    /// ```
1120    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1121    where
1122        F: Fn(&(K, V)) + 'a,
1123    {
1124        let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1125
1126        KeyedStream::new(
1127            self.location.clone(),
1128            HydroNode::Inspect {
1129                f: inspect_f,
1130                input: Box::new(self.ir_node.into_inner()),
1131                metadata: self.location.new_node_metadata(Self::collection_kind()),
1132            },
1133        )
1134    }
1135
1136    /// An operator which allows you to "name" a `HydroNode`.
1137    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1138    pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
1139        {
1140            let mut node = self.ir_node.borrow_mut();
1141            let metadata = node.metadata_mut();
1142            metadata.tag = Some(name.to_string());
1143        }
1144        self
1145    }
1146}
1147
1148impl<'a, K1, K2, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
1149    KeyedStream<(K1, K2), V, L, B, O, R>
1150{
1151    /// Produces a new keyed stream by dropping the first element of the compound key.
1152    ///
1153    /// Because multiple keys may share the same suffix, this operation results in re-grouping
1154    /// of the values under the new keys. The values across groups with the same new key
1155    /// will be interleaved, so the resulting stream has [`NoOrder`] within each group.
1156    ///
1157    /// # Example
1158    /// ```rust
1159    /// # #[cfg(feature = "deploy")] {
1160    /// # use hydro_lang::prelude::*;
1161    /// # use futures::StreamExt;
1162    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1163    /// process
1164    ///     .source_iter(q!(vec![((1, 10), 2), ((1, 10), 3), ((2, 20), 4)]))
1165    ///     .into_keyed()
1166    ///     .drop_key_prefix()
1167    /// #   .entries()
1168    /// # }, |mut stream| async move {
1169    /// // { 10: [2, 3], 20: [4] }
1170    /// # let mut results = Vec::new();
1171    /// # for _ in 0..3 {
1172    /// #     results.push(stream.next().await.unwrap());
1173    /// # }
1174    /// # results.sort();
1175    /// # assert_eq!(results, vec![(10, 2), (10, 3), (20, 4)]);
1176    /// # }));
1177    /// # }
1178    /// ```
1179    pub fn drop_key_prefix(self) -> KeyedStream<K2, V, L, B, NoOrder, R> {
1180        self.entries()
1181            .map(q!(|((_k1, k2), v)| (k2, v)))
1182            .into_keyed()
1183    }
1184}
1185
1186impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries>
1187    KeyedStream<K, V, L, Unbounded, O, R>
1188{
1189    /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
1190    /// of any overlapping groups. The result has [`NoOrder`] on each group because the
1191    /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
1192    /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
1193    ///
1194    /// Currently, both input streams must be [`Unbounded`].
1195    ///
1196    /// # Example
1197    /// ```rust
1198    /// # #[cfg(feature = "deploy")] {
1199    /// # use hydro_lang::prelude::*;
1200    /// # use futures::StreamExt;
1201    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1202    /// let numbers1: KeyedStream<i32, i32, _> = // { 1: [2], 3: [4] }
1203    /// # process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed().into();
1204    /// let numbers2: KeyedStream<i32, i32, _> = // { 1: [3], 3: [5] }
1205    /// # process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed().into();
1206    /// numbers1.interleave(numbers2)
1207    /// #   .entries()
1208    /// # }, |mut stream| async move {
1209    /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
1210    /// # let mut results = Vec::new();
1211    /// # for _ in 0..4 {
1212    /// #     results.push(stream.next().await.unwrap());
1213    /// # }
1214    /// # results.sort();
1215    /// # assert_eq!(results, vec![(1, 2), (1, 3), (3, 4), (3, 5)]);
1216    /// # }));
1217    /// # }
1218    /// ```
1219    pub fn interleave<O2: Ordering, R2: Retries>(
1220        self,
1221        other: KeyedStream<K, V, L, Unbounded, O2, R2>,
1222    ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1223    where
1224        R: MinRetries<R2>,
1225    {
1226        let tick = self.location.tick();
1227        // Because the outputs are unordered, we can interleave batches from both streams.
1228        let nondet_batch_interleaving = nondet!(/** output stream is NoOrder, can interleave */);
1229        self.batch(&tick, nondet_batch_interleaving)
1230            .weakest_ordering()
1231            .chain(
1232                other
1233                    .batch(&tick, nondet_batch_interleaving)
1234                    .weakest_ordering(),
1235            )
1236            .all_ticks()
1237    }
1238}
1239
1240/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
1241/// control the processing of future elements.
1242pub enum Generate<T> {
1243    /// Emit the provided element, and keep processing future inputs.
1244    Yield(T),
1245    /// Emit the provided element as the _final_ element, do not process future inputs.
1246    Return(T),
1247    /// Do not emit anything, but continue processing future inputs.
1248    Continue,
1249    /// Do not emit anything, and do not process further inputs.
1250    Break,
1251}
1252
1253impl<'a, K, V, L, B: Boundedness> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
1254where
1255    L: Location<'a>,
1256{
1257    /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
1258    ///
1259    /// Unlike [`KeyedStream::fold`] which only returns the final accumulated value, `scan` produces a new stream
1260    /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
1261    /// early by returning `None`.
1262    ///
1263    /// The function takes a mutable reference to the accumulator and the current element, and returns
1264    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1265    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1266    ///
1267    /// # Example
1268    /// ```rust
1269    /// # #[cfg(feature = "deploy")] {
1270    /// # use hydro_lang::prelude::*;
1271    /// # use futures::StreamExt;
1272    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1273    /// process
1274    ///     .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
1275    ///     .into_keyed()
1276    ///     .scan(
1277    ///         q!(|| 0),
1278    ///         q!(|acc, x| {
1279    ///             *acc += x;
1280    ///             if *acc % 2 == 0 { None } else { Some(*acc) }
1281    ///         }),
1282    ///     )
1283    /// #   .entries()
1284    /// # }, |mut stream| async move {
1285    /// // Output: { 0: [1], 1: [3, 7] }
1286    /// # let mut results = Vec::new();
1287    /// # for _ in 0..3 {
1288    /// #     results.push(stream.next().await.unwrap());
1289    /// # }
1290    /// # results.sort();
1291    /// # assert_eq!(results, vec![(0, 1), (1, 3), (1, 7)]);
1292    /// # }));
1293    /// # }
1294    /// ```
1295    pub fn scan<A, U, I, F>(
1296        self,
1297        init: impl IntoQuotedMut<'a, I, L> + Copy,
1298        f: impl IntoQuotedMut<'a, F, L> + Copy,
1299    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1300    where
1301        K: Clone + Eq + Hash,
1302        I: Fn() -> A + 'a,
1303        F: Fn(&mut A, V) -> Option<U> + 'a,
1304    {
1305        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1306        self.generator(
1307            init,
1308            q!({
1309                let orig = f;
1310                move |state, v| {
1311                    if let Some(out) = orig(state, v) {
1312                        Generate::Yield(out)
1313                    } else {
1314                        Generate::Break
1315                    }
1316                }
1317            }),
1318        )
1319    }
1320
1321    /// Iteratively processes the elements in each group using a state machine that can yield
1322    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1323    /// syntax in Rust, without requiring special syntax.
1324    ///
1325    /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
1326    /// state for each group. The second argument defines the processing logic, taking in a
1327    /// mutable reference to the group's state and the value to be processed. It emits a
1328    /// [`Generate`] value, whose variants define what is emitted and whether further inputs
1329    /// should be processed.
1330    ///
1331    /// # Example
1332    /// ```rust
1333    /// # #[cfg(feature = "deploy")] {
1334    /// # use hydro_lang::prelude::*;
1335    /// # use futures::StreamExt;
1336    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1337    /// process
1338    ///     .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
1339    ///     .into_keyed()
1340    ///     .generator(
1341    ///         q!(|| 0),
1342    ///         q!(|acc, x| {
1343    ///             *acc += x;
1344    ///             if *acc > 100 {
1345    ///                 hydro_lang::live_collections::keyed_stream::Generate::Return(
1346    ///                     "done!".to_string()
1347    ///                 )
1348    ///             } else if *acc % 2 == 0 {
1349    ///                 hydro_lang::live_collections::keyed_stream::Generate::Yield(
1350    ///                     "even".to_string()
1351    ///                 )
1352    ///             } else {
1353    ///                 hydro_lang::live_collections::keyed_stream::Generate::Continue
1354    ///             }
1355    ///         }),
1356    ///     )
1357    /// #   .entries()
1358    /// # }, |mut stream| async move {
1359    /// // Output: { 0: ["even", "done!"], 1: ["even"] }
1360    /// # let mut results = Vec::new();
1361    /// # for _ in 0..3 {
1362    /// #     results.push(stream.next().await.unwrap());
1363    /// # }
1364    /// # results.sort();
1365    /// # assert_eq!(results, vec![(0, "done!".to_string()), (0, "even".to_string()), (1, "even".to_string())]);
1366    /// # }));
1367    /// # }
1368    /// ```
1369    pub fn generator<A, U, I, F>(
1370        self,
1371        init: impl IntoQuotedMut<'a, I, L> + Copy,
1372        f: impl IntoQuotedMut<'a, F, L> + Copy,
1373    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1374    where
1375        K: Clone + Eq + Hash,
1376        I: Fn() -> A + 'a,
1377        F: Fn(&mut A, V) -> Generate<U> + 'a,
1378    {
1379        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1380        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1381
1382        let scan_init = q!(|| HashMap::new())
1383            .splice_fn0_ctx::<HashMap<K, Option<A>>>(&self.location)
1384            .into();
1385        let scan_f = q!(move |acc: &mut HashMap<_, _>, (k, v)| {
1386            let existing_state = acc.entry(Clone::clone(&k)).or_insert_with(|| Some(init()));
1387            if let Some(existing_state_value) = existing_state {
1388                match f(existing_state_value, v) {
1389                    Generate::Yield(out) => Some(Some((k, out))),
1390                    Generate::Return(out) => {
1391                        let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1392                        Some(Some((k, out)))
1393                    }
1394                    Generate::Break => {
1395                        let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1396                        Some(None)
1397                    }
1398                    Generate::Continue => Some(None),
1399                }
1400            } else {
1401                Some(None)
1402            }
1403        })
1404        .splice_fn2_borrow_mut_ctx::<HashMap<K, Option<A>>, (K, V), _>(&self.location)
1405        .into();
1406
1407        let scan_node = HydroNode::Scan {
1408            init: scan_init,
1409            acc: scan_f,
1410            input: Box::new(self.ir_node.into_inner()),
1411            metadata: self.location.new_node_metadata(Stream::<
1412                Option<(K, U)>,
1413                L,
1414                B,
1415                TotalOrder,
1416                ExactlyOnce,
1417            >::collection_kind()),
1418        };
1419
1420        let flatten_f = q!(|d| d)
1421            .splice_fn1_ctx::<Option<(K, U)>, _>(&self.location)
1422            .into();
1423        let flatten_node = HydroNode::FlatMap {
1424            f: flatten_f,
1425            input: Box::new(scan_node),
1426            metadata: self.location.new_node_metadata(KeyedStream::<
1427                K,
1428                U,
1429                L,
1430                B,
1431                TotalOrder,
1432                ExactlyOnce,
1433            >::collection_kind()),
1434        };
1435
1436        KeyedStream::new(self.location.clone(), flatten_node)
1437    }
1438
1439    /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
1440    /// in-order across the values in each group. But the aggregation function returns a boolean,
1441    /// which when true indicates that the aggregated result is complete and can be released to
1442    /// downstream computation. Unlike [`KeyedStream::fold`], this means that even if the input
1443    /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
1444    /// normal stream elements.
1445    ///
1446    /// # Example
1447    /// ```rust
1448    /// # #[cfg(feature = "deploy")] {
1449    /// # use hydro_lang::prelude::*;
1450    /// # use futures::StreamExt;
1451    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1452    /// process
1453    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1454    ///     .into_keyed()
1455    ///     .fold_early_stop(
1456    ///         q!(|| 0),
1457    ///         q!(|acc, x| {
1458    ///             *acc += x;
1459    ///             x % 2 == 0
1460    ///         }),
1461    ///     )
1462    /// #   .entries()
1463    /// # }, |mut stream| async move {
1464    /// // Output: { 0: 2, 1: 9 }
1465    /// # let mut results = Vec::new();
1466    /// # for _ in 0..2 {
1467    /// #     results.push(stream.next().await.unwrap());
1468    /// # }
1469    /// # results.sort();
1470    /// # assert_eq!(results, vec![(0, 2), (1, 9)]);
1471    /// # }));
1472    /// # }
1473    /// ```
1474    pub fn fold_early_stop<A, I, F>(
1475        self,
1476        init: impl IntoQuotedMut<'a, I, L> + Copy,
1477        f: impl IntoQuotedMut<'a, F, L> + Copy,
1478    ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
1479    where
1480        K: Clone + Eq + Hash,
1481        I: Fn() -> A + 'a,
1482        F: Fn(&mut A, V) -> bool + 'a,
1483    {
1484        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1485        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1486        let out_without_bound_cast = self.generator(
1487            q!(move || Some(init())),
1488            q!(move |key_state, v| {
1489                if let Some(key_state_value) = key_state.as_mut() {
1490                    if f(key_state_value, v) {
1491                        Generate::Return(key_state.take().unwrap())
1492                    } else {
1493                        Generate::Continue
1494                    }
1495                } else {
1496                    unreachable!()
1497                }
1498            }),
1499        );
1500
1501        KeyedSingleton::new(
1502            out_without_bound_cast.location.clone(),
1503            HydroNode::Cast {
1504                inner: Box::new(out_without_bound_cast.ir_node.into_inner()),
1505                metadata: out_without_bound_cast
1506                    .location
1507                    .new_node_metadata(
1508                        KeyedSingleton::<K, A, L, B::WhenValueBounded>::collection_kind(),
1509                    ),
1510            },
1511        )
1512    }
1513
1514    /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
1515    /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
1516    /// otherwise the first element would be non-deterministic.
1517    ///
1518    /// # Example
1519    /// ```rust
1520    /// # #[cfg(feature = "deploy")] {
1521    /// # use hydro_lang::prelude::*;
1522    /// # use futures::StreamExt;
1523    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1524    /// process
1525    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1526    ///     .into_keyed()
1527    ///     .first()
1528    /// #   .entries()
1529    /// # }, |mut stream| async move {
1530    /// // Output: { 0: 2, 1: 3 }
1531    /// # let mut results = Vec::new();
1532    /// # for _ in 0..2 {
1533    /// #     results.push(stream.next().await.unwrap());
1534    /// # }
1535    /// # results.sort();
1536    /// # assert_eq!(results, vec![(0, 2), (1, 3)]);
1537    /// # }));
1538    /// # }
1539    /// ```
1540    pub fn first(self) -> KeyedSingleton<K, V, L, B::WhenValueBounded>
1541    where
1542        K: Clone + Eq + Hash,
1543    {
1544        self.fold_early_stop(
1545            q!(|| None),
1546            q!(|acc, v| {
1547                *acc = Some(v);
1548                true
1549            }),
1550        )
1551        .map(q!(|v| v.unwrap()))
1552    }
1553}
1554
1555impl<'a, K, V, L, B: Boundedness, O: Ordering> KeyedStream<K, V, L, B, O, ExactlyOnce>
1556where
1557    L: Location<'a>,
1558{
1559    /// Counts the number of elements in each group, producing a [`KeyedSingleton`] with the counts.
1560    ///
1561    /// # Example
1562    /// ```rust
1563    /// # #[cfg(feature = "deploy")] {
1564    /// # use hydro_lang::prelude::*;
1565    /// # use futures::StreamExt;
1566    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1567    /// let tick = process.tick();
1568    /// let numbers = process
1569    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4), (1, 5)]))
1570    ///     .into_keyed();
1571    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1572    /// batch
1573    ///     .value_counts()
1574    ///     .entries()
1575    ///     .all_ticks()
1576    /// # }, |mut stream| async move {
1577    /// // (1, 3), (2, 2)
1578    /// # let mut results = Vec::new();
1579    /// # for _ in 0..2 {
1580    /// #     results.push(stream.next().await.unwrap());
1581    /// # }
1582    /// # results.sort();
1583    /// # assert_eq!(results, vec![(1, 3), (2, 2)]);
1584    /// # }));
1585    /// # }
1586    /// ```
1587    pub fn value_counts(self) -> KeyedSingleton<K, usize, L, B::WhenValueUnbounded>
1588    where
1589        K: Eq + Hash,
1590    {
1591        self.assume_ordering_trusted(
1592            nondet!(/** ordering within each group affects neither result nor intermediates */),
1593        )
1594        .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1595    }
1596}
1597
1598impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1599where
1600    L: Location<'a>,
1601{
1602    /// Like [`Stream::fold`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1603    /// group via the `comb` closure.
1604    ///
1605    /// Depending on the input stream guarantees, the closure may need to be commutative
1606    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1607    ///
1608    /// If the input and output value types are the same and do not require initialization then use
1609    /// [`KeyedStream::reduce`].
1610    ///
1611    /// # Example
1612    /// ```rust
1613    /// # #[cfg(feature = "deploy")] {
1614    /// # use hydro_lang::prelude::*;
1615    /// # use futures::StreamExt;
1616    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1617    /// let tick = process.tick();
1618    /// let numbers = process
1619    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1620    ///     .into_keyed();
1621    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1622    /// batch
1623    ///     .fold(q!(|| false), q!(|acc, x| *acc |= x))
1624    ///     .entries()
1625    ///     .all_ticks()
1626    /// # }, |mut stream| async move {
1627    /// // (1, false), (2, true)
1628    /// # let mut results = Vec::new();
1629    /// # for _ in 0..2 {
1630    /// #     results.push(stream.next().await.unwrap());
1631    /// # }
1632    /// # results.sort();
1633    /// # assert_eq!(results, vec![(1, false), (2, true)]);
1634    /// # }));
1635    /// # }
1636    /// ```
1637    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V), C, Idemp>(
1638        self,
1639        init: impl IntoQuotedMut<'a, I, L>,
1640        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1641    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
1642    where
1643        K: Eq + Hash,
1644        C: ValidCommutativityFor<O>,
1645        Idemp: ValidIdempotenceFor<R>,
1646    {
1647        let init = init.splice_fn0_ctx(&self.location).into();
1648        let comb = comb
1649            .splice_fn2_borrow_mut_ctx_props(&self.location)
1650            .0
1651            .into();
1652
1653        let ordered = self
1654            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1655            .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1656
1657        KeyedSingleton::new(
1658            ordered.location.clone(),
1659            HydroNode::FoldKeyed {
1660                init,
1661                acc: comb,
1662                input: Box::new(ordered.ir_node.into_inner()),
1663                metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1664                    K,
1665                    A,
1666                    L,
1667                    B::WhenValueUnbounded,
1668                >::collection_kind()),
1669            },
1670        )
1671    }
1672
1673    /// Like [`Stream::reduce`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1674    /// group via the `comb` closure.
1675    ///
1676    /// Depending on the input stream guarantees, the closure may need to be commutative
1677    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1678    ///
1679    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1680    ///
1681    /// # Example
1682    /// ```rust
1683    /// # #[cfg(feature = "deploy")] {
1684    /// # use hydro_lang::prelude::*;
1685    /// # use futures::StreamExt;
1686    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1687    /// let tick = process.tick();
1688    /// let numbers = process
1689    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1690    ///     .into_keyed();
1691    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1692    /// batch
1693    ///     .reduce(q!(|acc, x| *acc |= x))
1694    ///     .entries()
1695    ///     .all_ticks()
1696    /// # }, |mut stream| async move {
1697    /// // (1, false), (2, true)
1698    /// # let mut results = Vec::new();
1699    /// # for _ in 0..2 {
1700    /// #     results.push(stream.next().await.unwrap());
1701    /// # }
1702    /// # results.sort();
1703    /// # assert_eq!(results, vec![(1, false), (2, true)]);
1704    /// # }));
1705    /// # }
1706    /// ```
1707    pub fn reduce<F: Fn(&mut V, V) + 'a, C, Idemp>(
1708        self,
1709        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1710    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1711    where
1712        K: Eq + Hash,
1713        C: ValidCommutativityFor<O>,
1714        Idemp: ValidIdempotenceFor<R>,
1715    {
1716        let f = comb
1717            .splice_fn2_borrow_mut_ctx_props(&self.location)
1718            .0
1719            .into();
1720
1721        let ordered = self
1722            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1723            .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1724
1725        KeyedSingleton::new(
1726            ordered.location.clone(),
1727            HydroNode::ReduceKeyed {
1728                f,
1729                input: Box::new(ordered.ir_node.into_inner()),
1730                metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1731                    K,
1732                    V,
1733                    L,
1734                    B::WhenValueUnbounded,
1735                >::collection_kind()),
1736            },
1737        )
1738    }
1739
1740    /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark
1741    /// are automatically deleted.
1742    ///
1743    /// Depending on the input stream guarantees, the closure may need to be commutative
1744    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1745    ///
1746    /// # Example
1747    /// ```rust
1748    /// # #[cfg(feature = "deploy")] {
1749    /// # use hydro_lang::prelude::*;
1750    /// # use futures::StreamExt;
1751    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1752    /// let tick = process.tick();
1753    /// let watermark = tick.singleton(q!(1));
1754    /// let numbers = process
1755    ///     .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1756    ///     .into_keyed();
1757    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1758    /// batch
1759    ///     .reduce_watermark(watermark, q!(|acc, x| *acc |= x))
1760    ///     .entries()
1761    ///     .all_ticks()
1762    /// # }, |mut stream| async move {
1763    /// // (2, true)
1764    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1765    /// # }));
1766    /// # }
1767    /// ```
1768    pub fn reduce_watermark<O2, F, C, Idemp>(
1769        self,
1770        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1771        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1772    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1773    where
1774        K: Eq + Hash,
1775        O2: Clone,
1776        F: Fn(&mut V, V) + 'a,
1777        C: ValidCommutativityFor<O>,
1778        Idemp: ValidIdempotenceFor<R>,
1779    {
1780        let other: Optional<O2, Tick<L::Root>, Bounded> = other.into();
1781        check_matching_location(&self.location.root(), other.location.outer());
1782        let f = comb
1783            .splice_fn2_borrow_mut_ctx_props(&self.location)
1784            .0
1785            .into();
1786
1787        let ordered = self
1788            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1789            .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1790
1791        KeyedSingleton::new(
1792            ordered.location.clone(),
1793            HydroNode::ReduceKeyedWatermark {
1794                f,
1795                input: Box::new(ordered.ir_node.into_inner()),
1796                watermark: Box::new(other.ir_node.into_inner()),
1797                metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1798                    K,
1799                    V,
1800                    L,
1801                    B::WhenValueUnbounded,
1802                >::collection_kind()),
1803            },
1804        )
1805    }
1806
1807    /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1808    /// whose keys are not in the bounded stream.
1809    ///
1810    /// # Example
1811    /// ```rust
1812    /// # #[cfg(feature = "deploy")] {
1813    /// # use hydro_lang::prelude::*;
1814    /// # use futures::StreamExt;
1815    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1816    /// let tick = process.tick();
1817    /// let keyed_stream = process
1818    ///     .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1819    ///     .batch(&tick, nondet!(/** test */))
1820    ///     .into_keyed();
1821    /// let keys_to_remove = process
1822    ///     .source_iter(q!(vec![1, 2]))
1823    ///     .batch(&tick, nondet!(/** test */));
1824    /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1825    /// #   .entries()
1826    /// # }, |mut stream| async move {
1827    /// // { 3: ['c'], 4: ['d'] }
1828    /// # let mut results = Vec::new();
1829    /// # for _ in 0..2 {
1830    /// #     results.push(stream.next().await.unwrap());
1831    /// # }
1832    /// # results.sort();
1833    /// # assert_eq!(results, vec![(3, 'c'), (4, 'd')]);
1834    /// # }));
1835    /// # }
1836    /// ```
1837    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1838        self,
1839        other: Stream<K, L, Bounded, O2, R2>,
1840    ) -> Self
1841    where
1842        K: Eq + Hash,
1843    {
1844        check_matching_location(&self.location, &other.location);
1845
1846        KeyedStream::new(
1847            self.location.clone(),
1848            HydroNode::AntiJoin {
1849                pos: Box::new(self.ir_node.into_inner()),
1850                neg: Box::new(other.ir_node.into_inner()),
1851                metadata: self.location.new_node_metadata(Self::collection_kind()),
1852            },
1853        )
1854    }
1855}
1856
1857impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1858where
1859    L: Location<'a>,
1860{
1861    /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
1862    /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
1863    ///
1864    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1865    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1866    /// argument that declares where the stream will be atomically processed. Batching a stream into
1867    /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
1868    /// [`Tick`] will introduce asynchrony.
1869    pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
1870        let out_location = Atomic { tick: tick.clone() };
1871        KeyedStream::new(
1872            out_location.clone(),
1873            HydroNode::BeginAtomic {
1874                inner: Box::new(self.ir_node.into_inner()),
1875                metadata: out_location
1876                    .new_node_metadata(KeyedStream::<K, V, Atomic<L>, B, O, R>::collection_kind()),
1877            },
1878        )
1879    }
1880
1881    /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
1882    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1883    /// the order of the input.
1884    ///
1885    /// # Non-Determinism
1886    /// The batch boundaries are non-deterministic and may change across executions.
1887    pub fn batch(
1888        self,
1889        tick: &Tick<L>,
1890        nondet: NonDet,
1891    ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1892        let _ = nondet;
1893        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1894        KeyedStream::new(
1895            tick.clone(),
1896            HydroNode::Batch {
1897                inner: Box::new(self.ir_node.into_inner()),
1898                metadata: tick.new_node_metadata(
1899                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
1900                ),
1901            },
1902        )
1903    }
1904}
1905
1906impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
1907where
1908    L: Location<'a> + NoTick,
1909{
1910    /// Returns a keyed stream corresponding to the latest batch of elements being atomically
1911    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
1912    /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
1913    /// used to create the atomic section.
1914    ///
1915    /// # Non-Determinism
1916    /// The batch boundaries are non-deterministic and may change across executions.
1917    pub fn batch_atomic(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1918        let _ = nondet;
1919        KeyedStream::new(
1920            self.location.clone().tick,
1921            HydroNode::Batch {
1922                inner: Box::new(self.ir_node.into_inner()),
1923                metadata: self.location.tick.new_node_metadata(KeyedStream::<
1924                    K,
1925                    V,
1926                    Tick<L>,
1927                    Bounded,
1928                    O,
1929                    R,
1930                >::collection_kind(
1931                )),
1932            },
1933        )
1934    }
1935
1936    /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
1937    /// See [`KeyedStream::atomic`] for more details.
1938    pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
1939        KeyedStream::new(
1940            self.location.tick.l.clone(),
1941            HydroNode::EndAtomic {
1942                inner: Box::new(self.ir_node.into_inner()),
1943                metadata: self
1944                    .location
1945                    .tick
1946                    .l
1947                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
1948            },
1949        )
1950    }
1951}
1952
1953impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, L, Bounded, O, R>
1954where
1955    L: Location<'a>,
1956{
1957    /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
1958    /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
1959    /// is only present in one of the inputs, its values are passed through as-is). The output has
1960    /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
1961    ///
1962    /// Currently, both input streams must be [`Bounded`]. This operator will block
1963    /// on the first stream until all its elements are available. In a future version,
1964    /// we will relax the requirement on the `other` stream.
1965    ///
1966    /// # Example
1967    /// ```rust
1968    /// # #[cfg(feature = "deploy")] {
1969    /// # use hydro_lang::prelude::*;
1970    /// # use futures::StreamExt;
1971    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1972    /// let tick = process.tick();
1973    /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
1974    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1975    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1976    /// # .entries()
1977    /// # }, |mut stream| async move {
1978    /// // { 0: [2, 1], 1: [4, 3] }
1979    /// # let mut results = Vec::new();
1980    /// # for _ in 0..4 {
1981    /// #     results.push(stream.next().await.unwrap());
1982    /// # }
1983    /// # results.sort();
1984    /// # assert_eq!(results, vec![(0, 1), (0, 2), (1, 3), (1, 4)]);
1985    /// # }));
1986    /// # }
1987    /// ```
1988    pub fn chain<O2: Ordering, R2: Retries>(
1989        self,
1990        other: KeyedStream<K, V, L, Bounded, O2, R2>,
1991    ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1992    where
1993        O: MinOrder<O2>,
1994        R: MinRetries<R2>,
1995    {
1996        check_matching_location(&self.location, &other.location);
1997
1998        KeyedStream::new(
1999            self.location.clone(),
2000            HydroNode::Chain {
2001                first: Box::new(self.ir_node.into_inner()),
2002                second: Box::new(other.ir_node.into_inner()),
2003                metadata: self.location.new_node_metadata(KeyedStream::<
2004                    K,
2005                    V,
2006                    L,
2007                    Bounded,
2008                    <O as MinOrder<O2>>::Min,
2009                    <R as MinRetries<R2>>::Min,
2010                >::collection_kind()),
2011            },
2012        )
2013    }
2014}
2015
2016impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
2017where
2018    L: Location<'a>,
2019{
2020    /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2021    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2022    /// each key.
2023    pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
2024        KeyedStream::new(
2025            self.location.outer().clone(),
2026            HydroNode::YieldConcat {
2027                inner: Box::new(self.ir_node.into_inner()),
2028                metadata: self.location.outer().new_node_metadata(KeyedStream::<
2029                    K,
2030                    V,
2031                    L,
2032                    Unbounded,
2033                    O,
2034                    R,
2035                >::collection_kind(
2036                )),
2037            },
2038        )
2039    }
2040
2041    /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2042    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2043    /// each key.
2044    ///
2045    /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
2046    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2047    /// stream's [`Tick`] context.
2048    pub fn all_ticks_atomic(self) -> KeyedStream<K, V, Atomic<L>, Unbounded, O, R> {
2049        let out_location = Atomic {
2050            tick: self.location.clone(),
2051        };
2052
2053        KeyedStream::new(
2054            out_location.clone(),
2055            HydroNode::YieldConcat {
2056                inner: Box::new(self.ir_node.into_inner()),
2057                metadata: out_location.new_node_metadata(KeyedStream::<
2058                    K,
2059                    V,
2060                    Atomic<L>,
2061                    Unbounded,
2062                    O,
2063                    R,
2064                >::collection_kind()),
2065            },
2066        )
2067    }
2068
2069    /// Transforms the keyed stream using the given closure in "stateful" mode, where stateful operators
2070    /// such as `fold` retrain their memory for each key across ticks rather than resetting across batches of each key.
2071    ///
2072    /// This API is particularly useful for stateful computation on batches of data, such as
2073    /// maintaining an accumulated state that is up to date with the current batch.
2074    ///
2075    /// # Example
2076    /// ```rust
2077    /// # #[cfg(feature = "deploy")] {
2078    /// # use hydro_lang::prelude::*;
2079    /// # use futures::StreamExt;
2080    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2081    /// let tick = process.tick();
2082    /// # // ticks are lazy by default, forces the second tick to run
2083    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2084    /// # let batch_first_tick = process
2085    /// #   .source_iter(q!(vec![(0, 1), (1, 2), (2, 3), (3, 4)]))
2086    /// #   .into_keyed()
2087    /// #   .batch(&tick, nondet!(/** test */));
2088    /// # let batch_second_tick = process
2089    /// #   .source_iter(q!(vec![(0, 5), (1, 6), (2, 7)]))
2090    /// #   .into_keyed()
2091    /// #   .batch(&tick, nondet!(/** test */))
2092    /// #   .defer_tick(); // appears on the second tick
2093    /// let input = batch_first_tick.chain(batch_second_tick).all_ticks();
2094    ///
2095    /// input.batch(&tick, nondet!(/** test */))
2096    ///     .across_ticks(|s| s.reduce(q!(|sum, new| {
2097    ///         *sum += new;
2098    ///     }))).entries().all_ticks()
2099    /// # }, |mut stream| async move {
2100    /// // First tick: [(0, 1), (1, 2), (2, 3), (3, 4)]
2101    /// # let mut results = Vec::new();
2102    /// # for _ in 0..4 {
2103    /// #     results.push(stream.next().await.unwrap());
2104    /// # }
2105    /// # results.sort();
2106    /// # assert_eq!(results, vec![(0, 1), (1, 2), (2, 3), (3, 4)]);
2107    /// // Second tick: [(0, 6), (1, 8), (2, 10), (3, 4)]
2108    /// # results.clear();
2109    /// # for _ in 0..4 {
2110    /// #     results.push(stream.next().await.unwrap());
2111    /// # }
2112    /// # results.sort();
2113    /// # assert_eq!(results, vec![(0, 6), (1, 8), (2, 10), (3, 4)]);
2114    /// # }));
2115    /// # }
2116    /// ```
2117    pub fn across_ticks<Out: BatchAtomic>(
2118        self,
2119        thunk: impl FnOnce(KeyedStream<K, V, Atomic<L>, Unbounded, O, R>) -> Out,
2120    ) -> Out::Batched {
2121        thunk(self.all_ticks_atomic()).batched_atomic()
2122    }
2123
2124    /// Shifts the entries in `self` to the **next tick**, so that the returned keyed stream at
2125    /// tick `T` always has the entries of `self` at tick `T - 1`.
2126    ///
2127    /// At tick `0`, the output keyed stream is empty, since there is no previous tick.
2128    ///
2129    /// This operator enables stateful iterative processing with ticks, by sending data from one
2130    /// tick to the next. For example, you can use it to combine inputs across consecutive batches.
2131    ///
2132    /// # Example
2133    /// ```rust
2134    /// # #[cfg(feature = "deploy")] {
2135    /// # use hydro_lang::prelude::*;
2136    /// # use futures::StreamExt;
2137    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2138    /// let tick = process.tick();
2139    /// # // ticks are lazy by default, forces the second tick to run
2140    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2141    /// # let batch_first_tick = process
2142    /// #   .source_iter(q!(vec![(1, 2), (1, 3)]))
2143    /// #   .batch(&tick, nondet!(/** test */))
2144    /// #   .into_keyed();
2145    /// # let batch_second_tick = process
2146    /// #   .source_iter(q!(vec![(1, 4), (2, 5)]))
2147    /// #   .batch(&tick, nondet!(/** test */))
2148    /// #   .defer_tick()
2149    /// #   .into_keyed(); // appears on the second tick
2150    /// let changes_across_ticks = // { 1: [2, 3] } (first tick), { 1: [4], 2: [5] } (second tick)
2151    /// # batch_first_tick.chain(batch_second_tick);
2152    /// changes_across_ticks.clone().defer_tick().chain( // from the previous tick
2153    ///     changes_across_ticks // from the current tick
2154    /// )
2155    /// # .entries().all_ticks()
2156    /// # }, |mut stream| async move {
2157    /// // First tick: { 1: [2, 3] }
2158    /// # let mut results = Vec::new();
2159    /// # for _ in 0..2 {
2160    /// #     results.push(stream.next().await.unwrap());
2161    /// # }
2162    /// # results.sort();
2163    /// # assert_eq!(results, vec![(1, 2), (1, 3)]);
2164    /// // Second tick: { 1: [2, 3, 4], 2: [5] }
2165    /// # results.clear();
2166    /// # for _ in 0..4 {
2167    /// #     results.push(stream.next().await.unwrap());
2168    /// # }
2169    /// # results.sort();
2170    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5)]);
2171    /// // Third tick: { 1: [4], 2: [5] }
2172    /// # results.clear();
2173    /// # for _ in 0..2 {
2174    /// #     results.push(stream.next().await.unwrap());
2175    /// # }
2176    /// # results.sort();
2177    /// # assert_eq!(results, vec![(1, 4), (2, 5)]);
2178    /// # }));
2179    /// # }
2180    /// ```
2181    pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2182        KeyedStream::new(
2183            self.location.clone(),
2184            HydroNode::DeferTick {
2185                input: Box::new(self.ir_node.into_inner()),
2186                metadata: self.location.new_node_metadata(KeyedStream::<
2187                    K,
2188                    V,
2189                    Tick<L>,
2190                    Bounded,
2191                    O,
2192                    R,
2193                >::collection_kind()),
2194            },
2195        )
2196    }
2197}
2198
2199#[cfg(test)]
2200mod tests {
2201    #[cfg(feature = "deploy")]
2202    use futures::{SinkExt, StreamExt};
2203    #[cfg(feature = "deploy")]
2204    use hydro_deploy::Deployment;
2205    #[cfg(any(feature = "deploy", feature = "sim"))]
2206    use stageleft::q;
2207
2208    #[cfg(any(feature = "deploy", feature = "sim"))]
2209    use crate::compile::builder::FlowBuilder;
2210    #[cfg(feature = "deploy")]
2211    use crate::live_collections::stream::ExactlyOnce;
2212    #[cfg(feature = "sim")]
2213    use crate::live_collections::stream::{NoOrder, TotalOrder};
2214    #[cfg(any(feature = "deploy", feature = "sim"))]
2215    use crate::location::Location;
2216    #[cfg(any(feature = "deploy", feature = "sim"))]
2217    use crate::nondet::nondet;
2218    #[cfg(feature = "deploy")]
2219    use crate::properties::ManualProof;
2220
2221    #[cfg(feature = "deploy")]
2222    #[tokio::test]
2223    async fn reduce_watermark_filter() {
2224        let mut deployment = Deployment::new();
2225
2226        let flow = FlowBuilder::new();
2227        let node = flow.process::<()>();
2228        let external = flow.external::<()>();
2229
2230        let node_tick = node.tick();
2231        let watermark = node_tick.singleton(q!(1));
2232
2233        let sum = node
2234            .source_stream(q!(tokio_stream::iter([
2235                (0, 100),
2236                (1, 101),
2237                (2, 102),
2238                (2, 102)
2239            ])))
2240            .into_keyed()
2241            .reduce_watermark(
2242                watermark,
2243                q!(|acc, v| {
2244                    *acc += v;
2245                }),
2246            )
2247            .snapshot(&node_tick, nondet!(/** test */))
2248            .entries()
2249            .all_ticks()
2250            .send_bincode_external(&external);
2251
2252        let nodes = flow
2253            .with_process(&node, deployment.Localhost())
2254            .with_external(&external, deployment.Localhost())
2255            .deploy(&mut deployment);
2256
2257        deployment.deploy().await.unwrap();
2258
2259        let mut out = nodes.connect(sum).await;
2260
2261        deployment.start().await.unwrap();
2262
2263        assert_eq!(out.next().await.unwrap(), (2, 204));
2264    }
2265
2266    #[cfg(feature = "deploy")]
2267    #[tokio::test]
2268    async fn reduce_watermark_bounded() {
2269        let mut deployment = Deployment::new();
2270
2271        let flow = FlowBuilder::new();
2272        let node = flow.process::<()>();
2273        let external = flow.external::<()>();
2274
2275        let node_tick = node.tick();
2276        let watermark = node_tick.singleton(q!(1));
2277
2278        let sum = node
2279            .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2280            .into_keyed()
2281            .reduce_watermark(
2282                watermark,
2283                q!(|acc, v| {
2284                    *acc += v;
2285                }),
2286            )
2287            .entries()
2288            .send_bincode_external(&external);
2289
2290        let nodes = flow
2291            .with_process(&node, deployment.Localhost())
2292            .with_external(&external, deployment.Localhost())
2293            .deploy(&mut deployment);
2294
2295        deployment.deploy().await.unwrap();
2296
2297        let mut out = nodes.connect(sum).await;
2298
2299        deployment.start().await.unwrap();
2300
2301        assert_eq!(out.next().await.unwrap(), (2, 204));
2302    }
2303
2304    #[cfg(feature = "deploy")]
2305    #[tokio::test]
2306    async fn reduce_watermark_garbage_collect() {
2307        let mut deployment = Deployment::new();
2308
2309        let flow = FlowBuilder::new();
2310        let node = flow.process::<()>();
2311        let external = flow.external::<()>();
2312        let (tick_send, tick_trigger) =
2313            node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
2314
2315        let node_tick = node.tick();
2316        let (watermark_complete_cycle, watermark) =
2317            node_tick.cycle_with_initial(node_tick.singleton(q!(1)));
2318        let next_watermark = watermark.clone().map(q!(|v| v + 1));
2319        watermark_complete_cycle.complete_next_tick(next_watermark);
2320
2321        let tick_triggered_input = node_tick
2322            .singleton(q!((3, 103)))
2323            .into_stream()
2324            .filter_if_some(
2325                tick_trigger
2326                    .clone()
2327                    .batch(&node_tick, nondet!(/** test */))
2328                    .first(),
2329            )
2330            .all_ticks();
2331
2332        let sum = node
2333            .source_stream(q!(tokio_stream::iter([
2334                (0, 100),
2335                (1, 101),
2336                (2, 102),
2337                (2, 102)
2338            ])))
2339            .interleave(tick_triggered_input)
2340            .into_keyed()
2341            .reduce_watermark(
2342                watermark,
2343                q!(
2344                    |acc, v| {
2345                        *acc += v;
2346                    },
2347                    commutative = ManualProof(/* integer addition is commutative */)
2348                ),
2349            )
2350            .snapshot(&node_tick, nondet!(/** test */))
2351            .entries()
2352            .all_ticks()
2353            .send_bincode_external(&external);
2354
2355        let nodes = flow
2356            .with_default_optimize()
2357            .with_process(&node, deployment.Localhost())
2358            .with_external(&external, deployment.Localhost())
2359            .deploy(&mut deployment);
2360
2361        deployment.deploy().await.unwrap();
2362
2363        let mut tick_send = nodes.connect(tick_send).await;
2364        let mut out_recv = nodes.connect(sum).await;
2365
2366        deployment.start().await.unwrap();
2367
2368        assert_eq!(out_recv.next().await.unwrap(), (2, 204));
2369
2370        tick_send.send(()).await.unwrap();
2371
2372        assert_eq!(out_recv.next().await.unwrap(), (3, 103));
2373    }
2374
2375    #[cfg(feature = "sim")]
2376    #[test]
2377    #[should_panic]
2378    fn sim_batch_nondet_size() {
2379        let flow = FlowBuilder::new();
2380        let node = flow.process::<()>();
2381
2382        let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2383
2384        let tick = node.tick();
2385        let out_recv = input
2386            .batch(&tick, nondet!(/** test */))
2387            .fold(q!(|| vec![]), q!(|acc, v| acc.push(v)))
2388            .entries()
2389            .all_ticks()
2390            .sim_output();
2391
2392        flow.sim().exhaustive(async || {
2393            out_recv
2394                .assert_yields_only_unordered([(1, vec![1, 2])])
2395                .await;
2396        });
2397    }
2398
2399    #[cfg(feature = "sim")]
2400    #[test]
2401    fn sim_batch_preserves_group_order() {
2402        let flow = FlowBuilder::new();
2403        let node = flow.process::<()>();
2404
2405        let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2406
2407        let tick = node.tick();
2408        let out_recv = input
2409            .batch(&tick, nondet!(/** test */))
2410            .all_ticks()
2411            .fold_early_stop(
2412                q!(|| 0),
2413                q!(|acc, v| {
2414                    *acc = std::cmp::max(v, *acc);
2415                    *acc >= 2
2416                }),
2417            )
2418            .entries()
2419            .sim_output();
2420
2421        let instances = flow.sim().exhaustive(async || {
2422            out_recv
2423                .assert_yields_only_unordered([(1, 2), (2, 3)])
2424                .await;
2425        });
2426
2427        assert_eq!(instances, 8);
2428        // - three cases: all three in a separate tick (pick where (2, 3) is)
2429        // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after
2430        // - two cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them
2431        // - one case: all three together
2432    }
2433
2434    #[cfg(feature = "sim")]
2435    #[test]
2436    fn sim_batch_unordered_shuffles() {
2437        let flow = FlowBuilder::new();
2438        let node = flow.process::<()>();
2439
2440        let input = node
2441            .source_iter(q!([(1, 1), (1, 2), (2, 3)]))
2442            .into_keyed()
2443            .weakest_ordering();
2444
2445        let tick = node.tick();
2446        let out_recv = input
2447            .batch(&tick, nondet!(/** test */))
2448            .all_ticks()
2449            .entries()
2450            .sim_output();
2451
2452        let instances = flow.sim().exhaustive(async || {
2453            out_recv
2454                .assert_yields_only_unordered([(1, 1), (1, 2), (2, 3)])
2455                .await;
2456        });
2457
2458        assert_eq!(instances, 13);
2459        // - 6 (3 * 2) cases: all three in a separate tick (pick where (2, 3) is), and order of (1, 1), (1, 2)
2460        // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2461        // - 4 (2 * 2) cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them, and order of (1, 1), (1, 2)
2462        // - one case: all three together (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2463    }
2464
2465    #[cfg(feature = "sim")]
2466    #[test]
2467    #[should_panic]
2468    fn sim_observe_order_batched() {
2469        let flow = FlowBuilder::new();
2470        let node = flow.process::<()>();
2471
2472        let (port, input) = node.sim_input::<_, NoOrder, _>();
2473
2474        let tick = node.tick();
2475        let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
2476        let out_recv = batch
2477            .assume_ordering::<TotalOrder>(nondet!(/** test */))
2478            .all_ticks()
2479            .first()
2480            .entries()
2481            .sim_output();
2482
2483        flow.sim().exhaustive(async || {
2484            port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
2485            out_recv
2486                .assert_yields_only_unordered([(1, 1), (2, 1)])
2487                .await; // fails with assume_ordering
2488        });
2489    }
2490
2491    #[cfg(feature = "sim")]
2492    #[test]
2493    fn sim_observe_order_batched_count() {
2494        let flow = FlowBuilder::new();
2495        let node = flow.process::<()>();
2496
2497        let (port, input) = node.sim_input::<_, NoOrder, _>();
2498
2499        let tick = node.tick();
2500        let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
2501        let out_recv = batch
2502            .assume_ordering::<TotalOrder>(nondet!(/** test */))
2503            .all_ticks()
2504            .entries()
2505            .sim_output();
2506
2507        let instance_count = flow.sim().exhaustive(async || {
2508            port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
2509            let _ = out_recv.collect_sorted::<Vec<_>>().await;
2510        });
2511
2512        assert_eq!(instance_count, 104); // too complicated to enumerate here, but less than stream equivalent
2513    }
2514}