Skip to main content

hydro_lang/live_collections/keyed_stream/
mod.rs

1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
11
12use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
13use super::keyed_singleton::KeyedSingleton;
14use super::optional::Optional;
15use super::stream::{
16    ExactlyOnce, IsExactlyOnce, IsOrdered, MinOrder, MinRetries, NoOrder, Stream, TotalOrder,
17};
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::stream::{
27    AtLeastOnce, Ordering, Retries, WeakerOrderingThan, WeakerRetryThan,
28};
29#[cfg(stageleft_runtime)]
30use crate::location::dynamic::{DynLocation, LocationId};
31use crate::location::tick::DeferTick;
32use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
33use crate::manual_expr::ManualExpr;
34use crate::nondet::{NonDet, nondet};
35use crate::properties::{AggFuncAlgebra, ValidCommutativityFor, ValidIdempotenceFor};
36
37pub mod networking;
38
39/// Streaming elements of type `V` grouped by a key of type `K`.
40///
41/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
42/// order of keys is non-deterministic but the order *within* each group may be deterministic.
43///
44/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
45/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
46/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
47///
48/// Type Parameters:
49/// - `K`: the type of the key for each group
50/// - `V`: the type of the elements inside each group
51/// - `Loc`: the [`Location`] where the keyed stream is materialized
52/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
53/// - `Order`: tracks whether the elements within each group have deterministic order
54///   ([`TotalOrder`]) or not ([`NoOrder`])
55/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
56///   ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
57pub struct KeyedStream<
58    K,
59    V,
60    Loc,
61    Bound: Boundedness = Unbounded,
62    Order: Ordering = TotalOrder,
63    Retry: Retries = ExactlyOnce,
64> {
65    pub(crate) location: Loc,
66    pub(crate) ir_node: RefCell<HydroNode>,
67    pub(crate) flow_state: FlowState,
68
69    _phantom: PhantomData<(K, V, Loc, Bound, Order, Retry)>,
70}
71
72impl<K, V, L, B: Boundedness, O: Ordering, R: Retries> Drop for KeyedStream<K, V, L, B, O, R> {
73    fn drop(&mut self) {
74        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
75        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
76            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
77                input: Box::new(ir_node),
78                op_metadata: HydroIrOpMetadata::new(),
79            });
80        }
81    }
82}
83
84impl<'a, K, V, L, O: Ordering, R: Retries> From<KeyedStream<K, V, L, Bounded, O, R>>
85    for KeyedStream<K, V, L, Unbounded, O, R>
86where
87    L: Location<'a>,
88{
89    fn from(stream: KeyedStream<K, V, L, Bounded, O, R>) -> KeyedStream<K, V, L, Unbounded, O, R> {
90        let new_meta = stream
91            .location
92            .new_node_metadata(KeyedStream::<K, V, L, Unbounded, O, R>::collection_kind());
93
94        KeyedStream {
95            location: stream.location.clone(),
96            flow_state: stream.flow_state.clone(),
97            ir_node: RefCell::new(HydroNode::Cast {
98                inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
99                metadata: new_meta,
100            }),
101            _phantom: PhantomData,
102        }
103    }
104}
105
106impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
107    for KeyedStream<K, V, L, B, NoOrder, R>
108where
109    L: Location<'a>,
110{
111    fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
112        stream.weaken_ordering()
113    }
114}
115
116impl<'a, K, V, L, O: Ordering, R: Retries> DeferTick for KeyedStream<K, V, Tick<L>, Bounded, O, R>
117where
118    L: Location<'a>,
119{
120    fn defer_tick(self) -> Self {
121        KeyedStream::defer_tick(self)
122    }
123}
124
125impl<'a, K, V, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
126    for KeyedStream<K, V, Tick<L>, Bounded, O, R>
127where
128    L: Location<'a>,
129{
130    type Location = Tick<L>;
131
132    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
133        KeyedStream {
134            flow_state: location.flow_state().clone(),
135            location: location.clone(),
136            ir_node: RefCell::new(HydroNode::CycleSource {
137                cycle_id,
138                metadata: location.new_node_metadata(
139                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
140                ),
141            }),
142            _phantom: PhantomData,
143        }
144    }
145}
146
147impl<'a, K, V, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
148    for KeyedStream<K, V, Tick<L>, Bounded, O, R>
149where
150    L: Location<'a>,
151{
152    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
153        assert_eq!(
154            Location::id(&self.location),
155            expected_location,
156            "locations do not match"
157        );
158
159        self.location
160            .flow_state()
161            .borrow_mut()
162            .push_root(HydroRoot::CycleSink {
163                cycle_id,
164                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
165                op_metadata: HydroIrOpMetadata::new(),
166            });
167    }
168}
169
170impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
171    for KeyedStream<K, V, L, B, O, R>
172where
173    L: Location<'a> + NoTick,
174{
175    type Location = L;
176
177    fn create_source(cycle_id: CycleId, location: L) -> Self {
178        KeyedStream {
179            flow_state: location.flow_state().clone(),
180            location: location.clone(),
181            ir_node: RefCell::new(HydroNode::CycleSource {
182                cycle_id,
183                metadata: location
184                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
185            }),
186            _phantom: PhantomData,
187        }
188    }
189}
190
191impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
192    for KeyedStream<K, V, L, B, O, R>
193where
194    L: Location<'a> + NoTick,
195{
196    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
197        assert_eq!(
198            Location::id(&self.location),
199            expected_location,
200            "locations do not match"
201        );
202        self.location
203            .flow_state()
204            .borrow_mut()
205            .push_root(HydroRoot::CycleSink {
206                cycle_id,
207                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
208                op_metadata: HydroIrOpMetadata::new(),
209            });
210    }
211}
212
213impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
214    Clone for KeyedStream<K, V, Loc, Bound, Order, R>
215{
216    fn clone(&self) -> Self {
217        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
218            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
219            *self.ir_node.borrow_mut() = HydroNode::Tee {
220                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
221                metadata: self.location.new_node_metadata(Self::collection_kind()),
222            };
223        }
224
225        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
226            KeyedStream {
227                location: self.location.clone(),
228                flow_state: self.flow_state.clone(),
229                ir_node: HydroNode::Tee {
230                    inner: SharedNode(inner.0.clone()),
231                    metadata: metadata.clone(),
232                }
233                .into(),
234                _phantom: PhantomData,
235            }
236        } else {
237            unreachable!()
238        }
239    }
240}
241
242/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
243/// control the processing of future elements.
244pub enum Generate<T> {
245    /// Emit the provided element, and keep processing future inputs.
246    Yield(T),
247    /// Emit the provided element as the _final_ element, do not process future inputs.
248    Return(T),
249    /// Do not emit anything, but continue processing future inputs.
250    Continue,
251    /// Do not emit anything, and do not process further inputs.
252    Break,
253}
254
255impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
256    KeyedStream<K, V, L, B, O, R>
257{
258    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
259        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
260        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
261
262        let flow_state = location.flow_state().clone();
263        KeyedStream {
264            location,
265            flow_state,
266            ir_node: RefCell::new(ir_node),
267            _phantom: PhantomData,
268        }
269    }
270
271    /// Returns the [`CollectionKind`] corresponding to this type.
272    pub fn collection_kind() -> CollectionKind {
273        CollectionKind::KeyedStream {
274            bound: B::BOUND_KIND,
275            value_order: O::ORDERING_KIND,
276            value_retry: R::RETRIES_KIND,
277            key_type: stageleft::quote_type::<K>().into(),
278            value_type: stageleft::quote_type::<V>().into(),
279        }
280    }
281
282    /// Returns the [`Location`] where this keyed stream is being materialized.
283    pub fn location(&self) -> &L {
284        &self.location
285    }
286
287    /// Explicitly "casts" the keyed stream to a type with a different ordering
288    /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
289    /// by the type-system.
290    ///
291    /// # Non-Determinism
292    /// This function is used as an escape hatch, and any mistakes in the
293    /// provided ordering guarantee will propagate into the guarantees
294    /// for the rest of the program.
295    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
296        if O::ORDERING_KIND == O2::ORDERING_KIND {
297            KeyedStream::new(
298                self.location.clone(),
299                self.ir_node.replace(HydroNode::Placeholder),
300            )
301        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
302            // We can always weaken the ordering guarantee
303            KeyedStream::new(
304                self.location.clone(),
305                HydroNode::Cast {
306                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
307                    metadata: self
308                        .location
309                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
310                },
311            )
312        } else {
313            KeyedStream::new(
314                self.location.clone(),
315                HydroNode::ObserveNonDet {
316                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
317                    trusted: false,
318                    metadata: self
319                        .location
320                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
321                },
322            )
323        }
324    }
325
326    fn assume_ordering_trusted<O2: Ordering>(
327        self,
328        _nondet: NonDet,
329    ) -> KeyedStream<K, V, L, B, O2, R> {
330        if O::ORDERING_KIND == O2::ORDERING_KIND {
331            KeyedStream::new(
332                self.location.clone(),
333                self.ir_node.replace(HydroNode::Placeholder),
334            )
335        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
336            // We can always weaken the ordering guarantee
337            KeyedStream::new(
338                self.location.clone(),
339                HydroNode::Cast {
340                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
341                    metadata: self
342                        .location
343                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
344                },
345            )
346        } else {
347            KeyedStream::new(
348                self.location.clone(),
349                HydroNode::ObserveNonDet {
350                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
351                    trusted: true,
352                    metadata: self
353                        .location
354                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
355                },
356            )
357        }
358    }
359
360    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
361    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
362    /// which is always safe because that is the weakest possible guarantee.
363    pub fn weakest_ordering(self) -> KeyedStream<K, V, L, B, NoOrder, R> {
364        self.weaken_ordering::<NoOrder>()
365    }
366
367    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
368    /// enforcing that `O2` is weaker than the input ordering guarantee.
369    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> KeyedStream<K, V, L, B, O2, R> {
370        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
371        self.assume_ordering::<O2>(nondet)
372    }
373
374    /// Explicitly "casts" the keyed stream to a type with a different retries
375    /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
376    /// be proven by the type-system.
377    ///
378    /// # Non-Determinism
379    /// This function is used as an escape hatch, and any mistakes in the
380    /// provided retries guarantee will propagate into the guarantees
381    /// for the rest of the program.
382    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
383        if R::RETRIES_KIND == R2::RETRIES_KIND {
384            KeyedStream::new(
385                self.location.clone(),
386                self.ir_node.replace(HydroNode::Placeholder),
387            )
388        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
389            // We can always weaken the retries guarantee
390            KeyedStream::new(
391                self.location.clone(),
392                HydroNode::Cast {
393                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
394                    metadata: self
395                        .location
396                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
397                },
398            )
399        } else {
400            KeyedStream::new(
401                self.location.clone(),
402                HydroNode::ObserveNonDet {
403                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
404                    trusted: false,
405                    metadata: self
406                        .location
407                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
408                },
409            )
410        }
411    }
412
413    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
414    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
415    /// which is always safe because that is the weakest possible guarantee.
416    pub fn weakest_retries(self) -> KeyedStream<K, V, L, B, O, AtLeastOnce> {
417        self.weaken_retries::<AtLeastOnce>()
418    }
419
420    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
421    /// enforcing that `R2` is weaker than the input retries guarantee.
422    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> KeyedStream<K, V, L, B, O, R2> {
423        let nondet = nondet!(/** this is a weaker retries guarantee, so it is safe to assume */);
424        self.assume_retries::<R2>(nondet)
425    }
426
427    /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
428    /// implies that `O == TotalOrder`.
429    pub fn make_totally_ordered(self) -> KeyedStream<K, V, L, B, TotalOrder, R>
430    where
431        O: IsOrdered,
432    {
433        self.assume_ordering(nondet!(/** no-op */))
434    }
435
436    /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
437    /// implies that `R == ExactlyOnce`.
438    pub fn make_exactly_once(self) -> KeyedStream<K, V, L, B, O, ExactlyOnce>
439    where
440        R: IsExactlyOnce,
441    {
442        self.assume_retries(nondet!(/** no-op */))
443    }
444
445    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
446    /// implies that `B == Bounded`.
447    pub fn make_bounded(self) -> KeyedStream<K, V, L, Bounded, O, R>
448    where
449        B: IsBounded,
450    {
451        KeyedStream::new(
452            self.location.clone(),
453            self.ir_node.replace(HydroNode::Placeholder),
454        )
455    }
456
457    /// Flattens the keyed stream into an unordered stream of key-value pairs.
458    ///
459    /// # Example
460    /// ```rust
461    /// # #[cfg(feature = "deploy")] {
462    /// # use hydro_lang::prelude::*;
463    /// # use futures::StreamExt;
464    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
465    /// process
466    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
467    ///     .into_keyed()
468    ///     .entries()
469    /// # }, |mut stream| async move {
470    /// // (1, 2), (1, 3), (2, 4) in any order
471    /// # let mut results = Vec::new();
472    /// # for _ in 0..3 {
473    /// #     results.push(stream.next().await.unwrap());
474    /// # }
475    /// # results.sort();
476    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
477    /// # }));
478    /// # }
479    /// ```
480    pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
481        Stream::new(
482            self.location.clone(),
483            HydroNode::Cast {
484                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
485                metadata: self
486                    .location
487                    .new_node_metadata(Stream::<(K, V), L, B, NoOrder, R>::collection_kind()),
488            },
489        )
490    }
491
492    /// Flattens the keyed stream into an unordered stream of only the values.
493    ///
494    /// # Example
495    /// ```rust
496    /// # #[cfg(feature = "deploy")] {
497    /// # use hydro_lang::prelude::*;
498    /// # use futures::StreamExt;
499    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
500    /// process
501    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
502    ///     .into_keyed()
503    ///     .values()
504    /// # }, |mut stream| async move {
505    /// // 2, 3, 4 in any order
506    /// # let mut results = Vec::new();
507    /// # for _ in 0..3 {
508    /// #     results.push(stream.next().await.unwrap());
509    /// # }
510    /// # results.sort();
511    /// # assert_eq!(results, vec![2, 3, 4]);
512    /// # }));
513    /// # }
514    /// ```
515    pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
516        self.entries().map(q!(|(_, v)| v))
517    }
518
519    /// Flattens the keyed stream into an unordered stream of just the keys.
520    ///
521    /// # Example
522    /// ```rust
523    /// # #[cfg(feature = "deploy")] {
524    /// # use hydro_lang::prelude::*;
525    /// # use futures::StreamExt;
526    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
527    /// # process
528    /// #     .source_iter(q!(vec![(1, 2), (2, 4), (1, 5)]))
529    /// #     .into_keyed()
530    /// #     .keys()
531    /// # }, |mut stream| async move {
532    /// // 1, 2 in any order
533    /// # let mut results = Vec::new();
534    /// # for _ in 0..2 {
535    /// #     results.push(stream.next().await.unwrap());
536    /// # }
537    /// # results.sort();
538    /// # assert_eq!(results, vec![1, 2]);
539    /// # }));
540    /// # }
541    /// ```
542    pub fn keys(self) -> Stream<K, L, B, NoOrder, ExactlyOnce>
543    where
544        K: Eq + Hash,
545    {
546        self.entries().map(q!(|(k, _)| k)).unique()
547    }
548
549    /// Transforms each value by invoking `f` on each element, with keys staying the same
550    /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
551    ///
552    /// If you do not want to modify the stream and instead only want to view
553    /// each item use [`KeyedStream::inspect`] instead.
554    ///
555    /// # Example
556    /// ```rust
557    /// # #[cfg(feature = "deploy")] {
558    /// # use hydro_lang::prelude::*;
559    /// # use futures::StreamExt;
560    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
561    /// process
562    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
563    ///     .into_keyed()
564    ///     .map(q!(|v| v + 1))
565    /// #   .entries()
566    /// # }, |mut stream| async move {
567    /// // { 1: [3, 4], 2: [5] }
568    /// # let mut results = Vec::new();
569    /// # for _ in 0..3 {
570    /// #     results.push(stream.next().await.unwrap());
571    /// # }
572    /// # results.sort();
573    /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 5)]);
574    /// # }));
575    /// # }
576    /// ```
577    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
578    where
579        F: Fn(V) -> U + 'a,
580    {
581        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
582        let map_f = q!({
583            let orig = f;
584            move |(k, v)| (k, orig(v))
585        })
586        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
587        .into();
588
589        KeyedStream::new(
590            self.location.clone(),
591            HydroNode::Map {
592                f: map_f,
593                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
594                metadata: self
595                    .location
596                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
597            },
598        )
599    }
600
601    /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
602    /// re-grouped even they are tuples; instead they will be grouped under the original key.
603    ///
604    /// If you do not want to modify the stream and instead only want to view
605    /// each item use [`KeyedStream::inspect_with_key`] instead.
606    ///
607    /// # Example
608    /// ```rust
609    /// # #[cfg(feature = "deploy")] {
610    /// # use hydro_lang::prelude::*;
611    /// # use futures::StreamExt;
612    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
613    /// process
614    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
615    ///     .into_keyed()
616    ///     .map_with_key(q!(|(k, v)| k + v))
617    /// #   .entries()
618    /// # }, |mut stream| async move {
619    /// // { 1: [3, 4], 2: [6] }
620    /// # let mut results = Vec::new();
621    /// # for _ in 0..3 {
622    /// #     results.push(stream.next().await.unwrap());
623    /// # }
624    /// # results.sort();
625    /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 6)]);
626    /// # }));
627    /// # }
628    /// ```
629    pub fn map_with_key<U, F>(
630        self,
631        f: impl IntoQuotedMut<'a, F, L> + Copy,
632    ) -> KeyedStream<K, U, L, B, O, R>
633    where
634        F: Fn((K, V)) -> U + 'a,
635        K: Clone,
636    {
637        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
638        let map_f = q!({
639            let orig = f;
640            move |(k, v)| {
641                let out = orig((Clone::clone(&k), v));
642                (k, out)
643            }
644        })
645        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
646        .into();
647
648        KeyedStream::new(
649            self.location.clone(),
650            HydroNode::Map {
651                f: map_f,
652                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
653                metadata: self
654                    .location
655                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
656            },
657        )
658    }
659
660    /// Prepends a new value to the key of each element in the stream, producing a new
661    /// keyed stream with compound keys. Because the original key is preserved, no re-grouping
662    /// occurs and the elements in each group preserve their original order.
663    ///
664    /// # Example
665    /// ```rust
666    /// # #[cfg(feature = "deploy")] {
667    /// # use hydro_lang::prelude::*;
668    /// # use futures::StreamExt;
669    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
670    /// process
671    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
672    ///     .into_keyed()
673    ///     .prefix_key(q!(|&(k, _)| k % 2))
674    /// #   .entries()
675    /// # }, |mut stream| async move {
676    /// // { (1, 1): [2, 3], (0, 2): [4] }
677    /// # let mut results = Vec::new();
678    /// # for _ in 0..3 {
679    /// #     results.push(stream.next().await.unwrap());
680    /// # }
681    /// # results.sort();
682    /// # assert_eq!(results, vec![((0, 2), 4), ((1, 1), 2), ((1, 1), 3)]);
683    /// # }));
684    /// # }
685    /// ```
686    pub fn prefix_key<K2, F>(
687        self,
688        f: impl IntoQuotedMut<'a, F, L> + Copy,
689    ) -> KeyedStream<(K2, K), V, L, B, O, R>
690    where
691        F: Fn(&(K, V)) -> K2 + 'a,
692    {
693        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
694        let map_f = q!({
695            let orig = f;
696            move |kv| {
697                let out = orig(&kv);
698                ((out, kv.0), kv.1)
699            }
700        })
701        .splice_fn1_ctx::<(K, V), ((K2, K), V)>(&self.location)
702        .into();
703
704        KeyedStream::new(
705            self.location.clone(),
706            HydroNode::Map {
707                f: map_f,
708                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
709                metadata: self
710                    .location
711                    .new_node_metadata(KeyedStream::<(K2, K), V, L, B, O, R>::collection_kind()),
712            },
713        )
714    }
715
716    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
717    /// `f`, preserving the order of the elements within the group.
718    ///
719    /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
720    /// not modify or take ownership of the values. If you need to modify the values while filtering
721    /// use [`KeyedStream::filter_map`] instead.
722    ///
723    /// # Example
724    /// ```rust
725    /// # #[cfg(feature = "deploy")] {
726    /// # use hydro_lang::prelude::*;
727    /// # use futures::StreamExt;
728    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
729    /// process
730    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
731    ///     .into_keyed()
732    ///     .filter(q!(|&x| x > 2))
733    /// #   .entries()
734    /// # }, |mut stream| async move {
735    /// // { 1: [3], 2: [4] }
736    /// # let mut results = Vec::new();
737    /// # for _ in 0..2 {
738    /// #     results.push(stream.next().await.unwrap());
739    /// # }
740    /// # results.sort();
741    /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
742    /// # }));
743    /// # }
744    /// ```
745    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
746    where
747        F: Fn(&V) -> bool + 'a,
748    {
749        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
750        let filter_f = q!({
751            let orig = f;
752            move |t: &(_, _)| orig(&t.1)
753        })
754        .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
755        .into();
756
757        KeyedStream::new(
758            self.location.clone(),
759            HydroNode::Filter {
760                f: filter_f,
761                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
762                metadata: self.location.new_node_metadata(Self::collection_kind()),
763            },
764        )
765    }
766
767    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
768    /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
769    ///
770    /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
771    /// not modify or take ownership of the values. If you need to modify the values while filtering
772    /// use [`KeyedStream::filter_map_with_key`] instead.
773    ///
774    /// # Example
775    /// ```rust
776    /// # #[cfg(feature = "deploy")] {
777    /// # use hydro_lang::prelude::*;
778    /// # use futures::StreamExt;
779    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
780    /// process
781    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
782    ///     .into_keyed()
783    ///     .filter_with_key(q!(|&(k, v)| v - k == 2))
784    /// #   .entries()
785    /// # }, |mut stream| async move {
786    /// // { 1: [3], 2: [4] }
787    /// # let mut results = Vec::new();
788    /// # for _ in 0..2 {
789    /// #     results.push(stream.next().await.unwrap());
790    /// # }
791    /// # results.sort();
792    /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
793    /// # }));
794    /// # }
795    /// ```
796    pub fn filter_with_key<F>(
797        self,
798        f: impl IntoQuotedMut<'a, F, L> + Copy,
799    ) -> KeyedStream<K, V, L, B, O, R>
800    where
801        F: Fn(&(K, V)) -> bool + 'a,
802    {
803        let filter_f = f
804            .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
805            .into();
806
807        KeyedStream::new(
808            self.location.clone(),
809            HydroNode::Filter {
810                f: filter_f,
811                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
812                metadata: self.location.new_node_metadata(Self::collection_kind()),
813            },
814        )
815    }
816
817    /// An operator that both filters and maps each value, with keys staying the same.
818    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
819    /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
820    ///
821    /// # Example
822    /// ```rust
823    /// # #[cfg(feature = "deploy")] {
824    /// # use hydro_lang::prelude::*;
825    /// # use futures::StreamExt;
826    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
827    /// process
828    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
829    ///     .into_keyed()
830    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
831    /// #   .entries()
832    /// # }, |mut stream| async move {
833    /// // { 1: [2], 2: [4] }
834    /// # let mut results = Vec::new();
835    /// # for _ in 0..2 {
836    /// #     results.push(stream.next().await.unwrap());
837    /// # }
838    /// # results.sort();
839    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
840    /// # }));
841    /// # }
842    /// ```
843    pub fn filter_map<U, F>(
844        self,
845        f: impl IntoQuotedMut<'a, F, L> + Copy,
846    ) -> KeyedStream<K, U, L, B, O, R>
847    where
848        F: Fn(V) -> Option<U> + 'a,
849    {
850        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
851        let filter_map_f = q!({
852            let orig = f;
853            move |(k, v)| orig(v).map(|o| (k, o))
854        })
855        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
856        .into();
857
858        KeyedStream::new(
859            self.location.clone(),
860            HydroNode::FilterMap {
861                f: filter_map_f,
862                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
863                metadata: self
864                    .location
865                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
866            },
867        )
868    }
869
870    /// An operator that both filters and maps each key-value pair. The resulting values are **not**
871    /// re-grouped even they are tuples; instead they will be grouped under the original key.
872    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
873    ///
874    /// # Example
875    /// ```rust
876    /// # #[cfg(feature = "deploy")] {
877    /// # use hydro_lang::prelude::*;
878    /// # use futures::StreamExt;
879    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
880    /// process
881    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
882    ///     .into_keyed()
883    ///     .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
884    /// #   .entries()
885    /// # }, |mut stream| async move {
886    /// // { 2: [2] }
887    /// # let mut results = Vec::new();
888    /// # for _ in 0..1 {
889    /// #     results.push(stream.next().await.unwrap());
890    /// # }
891    /// # results.sort();
892    /// # assert_eq!(results, vec![(2, 2)]);
893    /// # }));
894    /// # }
895    /// ```
896    pub fn filter_map_with_key<U, F>(
897        self,
898        f: impl IntoQuotedMut<'a, F, L> + Copy,
899    ) -> KeyedStream<K, U, L, B, O, R>
900    where
901        F: Fn((K, V)) -> Option<U> + 'a,
902        K: Clone,
903    {
904        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
905        let filter_map_f = q!({
906            let orig = f;
907            move |(k, v)| {
908                let out = orig((Clone::clone(&k), v));
909                out.map(|o| (k, o))
910            }
911        })
912        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
913        .into();
914
915        KeyedStream::new(
916            self.location.clone(),
917            HydroNode::FilterMap {
918                f: filter_map_f,
919                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
920                metadata: self
921                    .location
922                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
923            },
924        )
925    }
926
927    /// Generates a keyed stream that maps each value `v` to a tuple `(v, x)`,
928    /// where `v` is the value of `other`, a bounded [`super::singleton::Singleton`] or
929    /// [`Optional`]. If `other` is an empty [`Optional`], no values will be produced.
930    ///
931    /// # Example
932    /// ```rust
933    /// # #[cfg(feature = "deploy")] {
934    /// # use hydro_lang::prelude::*;
935    /// # use futures::StreamExt;
936    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
937    /// let tick = process.tick();
938    /// let batch = process
939    ///   .source_iter(q!(vec![(1, 123), (1, 456), (2, 123)]))
940    ///   .into_keyed()
941    ///   .batch(&tick, nondet!(/** test */));
942    /// let count = batch.clone().entries().count(); // `count()` returns a singleton
943    /// batch.cross_singleton(count).all_ticks().entries()
944    /// # }, |mut stream| async move {
945    /// // { 1: [(123, 3), (456, 3)], 2: [(123, 3)] }
946    /// # let mut results = Vec::new();
947    /// # for _ in 0..3 {
948    /// #     results.push(stream.next().await.unwrap());
949    /// # }
950    /// # results.sort();
951    /// # assert_eq!(results, vec![(1, (123, 3)), (1, (456, 3)), (2, (123, 3))]);
952    /// # }));
953    /// # }
954    /// ```
955    pub fn cross_singleton<O2>(
956        self,
957        other: impl Into<Optional<O2, L, Bounded>>,
958    ) -> KeyedStream<K, (V, O2), L, B, O, R>
959    where
960        O2: Clone,
961    {
962        let other: Optional<O2, L, Bounded> = other.into();
963        check_matching_location(&self.location, &other.location);
964
965        Stream::new(
966            self.location.clone(),
967            HydroNode::CrossSingleton {
968                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
969                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
970                metadata: self
971                    .location
972                    .new_node_metadata(Stream::<((K, V), O2), L, B, O, R>::collection_kind()),
973            },
974        )
975        .map(q!(|((k, v), o2)| (k, (v, o2))))
976        .into_keyed()
977    }
978
979    /// For each value `v` in each group, transform `v` using `f` and then treat the
980    /// result as an [`Iterator`] to produce values one by one within the same group.
981    /// The implementation for [`Iterator`] for the output type `I` must produce items
982    /// in a **deterministic** order.
983    ///
984    /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
985    /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
986    ///
987    /// # Example
988    /// ```rust
989    /// # #[cfg(feature = "deploy")] {
990    /// # use hydro_lang::prelude::*;
991    /// # use futures::StreamExt;
992    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
993    /// process
994    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
995    ///     .into_keyed()
996    ///     .flat_map_ordered(q!(|x| x))
997    /// #   .entries()
998    /// # }, |mut stream| async move {
999    /// // { 1: [2, 3, 4], 2: [5, 6] }
1000    /// # let mut results = Vec::new();
1001    /// # for _ in 0..5 {
1002    /// #     results.push(stream.next().await.unwrap());
1003    /// # }
1004    /// # results.sort();
1005    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1006    /// # }));
1007    /// # }
1008    /// ```
1009    pub fn flat_map_ordered<U, I, F>(
1010        self,
1011        f: impl IntoQuotedMut<'a, F, L> + Copy,
1012    ) -> KeyedStream<K, U, L, B, O, R>
1013    where
1014        I: IntoIterator<Item = U>,
1015        F: Fn(V) -> I + 'a,
1016        K: Clone,
1017    {
1018        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1019        let flat_map_f = q!({
1020            let orig = f;
1021            move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1022        })
1023        .splice_fn1_ctx::<(K, V), _>(&self.location)
1024        .into();
1025
1026        KeyedStream::new(
1027            self.location.clone(),
1028            HydroNode::FlatMap {
1029                f: flat_map_f,
1030                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1031                metadata: self
1032                    .location
1033                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
1034            },
1035        )
1036    }
1037
1038    /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
1039    /// for the output type `I` to produce items in any order.
1040    ///
1041    /// # Example
1042    /// ```rust
1043    /// # #[cfg(feature = "deploy")] {
1044    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1045    /// # use futures::StreamExt;
1046    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1047    /// process
1048    ///     .source_iter(q!(vec![
1049    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1050    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
1051    ///     ]))
1052    ///     .into_keyed()
1053    ///     .flat_map_unordered(q!(|x| x))
1054    /// #   .entries()
1055    /// # }, |mut stream| async move {
1056    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1057    /// # let mut results = Vec::new();
1058    /// # for _ in 0..4 {
1059    /// #     results.push(stream.next().await.unwrap());
1060    /// # }
1061    /// # results.sort();
1062    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1063    /// # }));
1064    /// # }
1065    /// ```
1066    pub fn flat_map_unordered<U, I, F>(
1067        self,
1068        f: impl IntoQuotedMut<'a, F, L> + Copy,
1069    ) -> KeyedStream<K, U, L, B, NoOrder, R>
1070    where
1071        I: IntoIterator<Item = U>,
1072        F: Fn(V) -> I + 'a,
1073        K: Clone,
1074    {
1075        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1076        let flat_map_f = q!({
1077            let orig = f;
1078            move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1079        })
1080        .splice_fn1_ctx::<(K, V), _>(&self.location)
1081        .into();
1082
1083        KeyedStream::new(
1084            self.location.clone(),
1085            HydroNode::FlatMap {
1086                f: flat_map_f,
1087                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1088                metadata: self
1089                    .location
1090                    .new_node_metadata(KeyedStream::<K, U, L, B, NoOrder, R>::collection_kind()),
1091            },
1092        )
1093    }
1094
1095    /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
1096    /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
1097    /// items in a **deterministic** order.
1098    ///
1099    /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
1100    /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
1101    ///
1102    /// # Example
1103    /// ```rust
1104    /// # #[cfg(feature = "deploy")] {
1105    /// # use hydro_lang::prelude::*;
1106    /// # use futures::StreamExt;
1107    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1108    /// process
1109    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
1110    ///     .into_keyed()
1111    ///     .flatten_ordered()
1112    /// #   .entries()
1113    /// # }, |mut stream| async move {
1114    /// // { 1: [2, 3, 4], 2: [5, 6] }
1115    /// # let mut results = Vec::new();
1116    /// # for _ in 0..5 {
1117    /// #     results.push(stream.next().await.unwrap());
1118    /// # }
1119    /// # results.sort();
1120    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1121    /// # }));
1122    /// # }
1123    /// ```
1124    pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
1125    where
1126        V: IntoIterator<Item = U>,
1127        K: Clone,
1128    {
1129        self.flat_map_ordered(q!(|d| d))
1130    }
1131
1132    /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
1133    /// for the value type `V` to produce items in any order.
1134    ///
1135    /// # Example
1136    /// ```rust
1137    /// # #[cfg(feature = "deploy")] {
1138    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1139    /// # use futures::StreamExt;
1140    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1141    /// process
1142    ///     .source_iter(q!(vec![
1143    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1144    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
1145    ///     ]))
1146    ///     .into_keyed()
1147    ///     .flatten_unordered()
1148    /// #   .entries()
1149    /// # }, |mut stream| async move {
1150    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1151    /// # let mut results = Vec::new();
1152    /// # for _ in 0..4 {
1153    /// #     results.push(stream.next().await.unwrap());
1154    /// # }
1155    /// # results.sort();
1156    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1157    /// # }));
1158    /// # }
1159    /// ```
1160    pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
1161    where
1162        V: IntoIterator<Item = U>,
1163        K: Clone,
1164    {
1165        self.flat_map_unordered(q!(|d| d))
1166    }
1167
1168    /// An operator which allows you to "inspect" each element of a stream without
1169    /// modifying it. The closure `f` is called on a reference to each value. This is
1170    /// mainly useful for debugging, and should not be used to generate side-effects.
1171    ///
1172    /// # Example
1173    /// ```rust
1174    /// # #[cfg(feature = "deploy")] {
1175    /// # use hydro_lang::prelude::*;
1176    /// # use futures::StreamExt;
1177    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1178    /// process
1179    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1180    ///     .into_keyed()
1181    ///     .inspect(q!(|v| println!("{}", v)))
1182    /// #   .entries()
1183    /// # }, |mut stream| async move {
1184    /// # let mut results = Vec::new();
1185    /// # for _ in 0..3 {
1186    /// #     results.push(stream.next().await.unwrap());
1187    /// # }
1188    /// # results.sort();
1189    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1190    /// # }));
1191    /// # }
1192    /// ```
1193    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1194    where
1195        F: Fn(&V) + 'a,
1196    {
1197        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1198        let inspect_f = q!({
1199            let orig = f;
1200            move |t: &(_, _)| orig(&t.1)
1201        })
1202        .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1203        .into();
1204
1205        KeyedStream::new(
1206            self.location.clone(),
1207            HydroNode::Inspect {
1208                f: inspect_f,
1209                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1210                metadata: self.location.new_node_metadata(Self::collection_kind()),
1211            },
1212        )
1213    }
1214
1215    /// An operator which allows you to "inspect" each element of a stream without
1216    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1217    /// mainly useful for debugging, and should not be used to generate side-effects.
1218    ///
1219    /// # Example
1220    /// ```rust
1221    /// # #[cfg(feature = "deploy")] {
1222    /// # use hydro_lang::prelude::*;
1223    /// # use futures::StreamExt;
1224    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1225    /// process
1226    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1227    ///     .into_keyed()
1228    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1229    /// #   .entries()
1230    /// # }, |mut stream| async move {
1231    /// # let mut results = Vec::new();
1232    /// # for _ in 0..3 {
1233    /// #     results.push(stream.next().await.unwrap());
1234    /// # }
1235    /// # results.sort();
1236    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1237    /// # }));
1238    /// # }
1239    /// ```
1240    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1241    where
1242        F: Fn(&(K, V)) + 'a,
1243    {
1244        let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1245
1246        KeyedStream::new(
1247            self.location.clone(),
1248            HydroNode::Inspect {
1249                f: inspect_f,
1250                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1251                metadata: self.location.new_node_metadata(Self::collection_kind()),
1252            },
1253        )
1254    }
1255
1256    /// An operator which allows you to "name" a `HydroNode`.
1257    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1258    pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
1259        {
1260            let mut node = self.ir_node.borrow_mut();
1261            let metadata = node.metadata_mut();
1262            metadata.tag = Some(name.to_owned());
1263        }
1264        self
1265    }
1266
1267    /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
1268    ///
1269    /// Unlike [`KeyedStream::fold`] which only returns the final accumulated value, `scan` produces a new stream
1270    /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
1271    /// early by returning `None`.
1272    ///
1273    /// The function takes a mutable reference to the accumulator and the current element, and returns
1274    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1275    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1276    ///
1277    /// # Example
1278    /// ```rust
1279    /// # #[cfg(feature = "deploy")] {
1280    /// # use hydro_lang::prelude::*;
1281    /// # use futures::StreamExt;
1282    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1283    /// process
1284    ///     .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
1285    ///     .into_keyed()
1286    ///     .scan(
1287    ///         q!(|| 0),
1288    ///         q!(|acc, x| {
1289    ///             *acc += x;
1290    ///             if *acc % 2 == 0 { None } else { Some(*acc) }
1291    ///         }),
1292    ///     )
1293    /// #   .entries()
1294    /// # }, |mut stream| async move {
1295    /// // Output: { 0: [1], 1: [3, 7] }
1296    /// # let mut results = Vec::new();
1297    /// # for _ in 0..3 {
1298    /// #     results.push(stream.next().await.unwrap());
1299    /// # }
1300    /// # results.sort();
1301    /// # assert_eq!(results, vec![(0, 1), (1, 3), (1, 7)]);
1302    /// # }));
1303    /// # }
1304    /// ```
1305    pub fn scan<A, U, I, F>(
1306        self,
1307        init: impl IntoQuotedMut<'a, I, L> + Copy,
1308        f: impl IntoQuotedMut<'a, F, L> + Copy,
1309    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1310    where
1311        O: IsOrdered,
1312        R: IsExactlyOnce,
1313        K: Clone + Eq + Hash,
1314        I: Fn() -> A + 'a,
1315        F: Fn(&mut A, V) -> Option<U> + 'a,
1316    {
1317        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1318        self.make_totally_ordered().make_exactly_once().generator(
1319            init,
1320            q!({
1321                let orig = f;
1322                move |state, v| {
1323                    if let Some(out) = orig(state, v) {
1324                        Generate::Yield(out)
1325                    } else {
1326                        Generate::Break
1327                    }
1328                }
1329            }),
1330        )
1331    }
1332
1333    /// Iteratively processes the elements in each group using a state machine that can yield
1334    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1335    /// syntax in Rust, without requiring special syntax.
1336    ///
1337    /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
1338    /// state for each group. The second argument defines the processing logic, taking in a
1339    /// mutable reference to the group's state and the value to be processed. It emits a
1340    /// [`Generate`] value, whose variants define what is emitted and whether further inputs
1341    /// should be processed.
1342    ///
1343    /// # Example
1344    /// ```rust
1345    /// # #[cfg(feature = "deploy")] {
1346    /// # use hydro_lang::prelude::*;
1347    /// # use futures::StreamExt;
1348    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1349    /// process
1350    ///     .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
1351    ///     .into_keyed()
1352    ///     .generator(
1353    ///         q!(|| 0),
1354    ///         q!(|acc, x| {
1355    ///             *acc += x;
1356    ///             if *acc > 100 {
1357    ///                 hydro_lang::live_collections::keyed_stream::Generate::Return(
1358    ///                     "done!".to_owned()
1359    ///                 )
1360    ///             } else if *acc % 2 == 0 {
1361    ///                 hydro_lang::live_collections::keyed_stream::Generate::Yield(
1362    ///                     "even".to_owned()
1363    ///                 )
1364    ///             } else {
1365    ///                 hydro_lang::live_collections::keyed_stream::Generate::Continue
1366    ///             }
1367    ///         }),
1368    ///     )
1369    /// #   .entries()
1370    /// # }, |mut stream| async move {
1371    /// // Output: { 0: ["even", "done!"], 1: ["even"] }
1372    /// # let mut results = Vec::new();
1373    /// # for _ in 0..3 {
1374    /// #     results.push(stream.next().await.unwrap());
1375    /// # }
1376    /// # results.sort();
1377    /// # assert_eq!(results, vec![(0, "done!".to_owned()), (0, "even".to_owned()), (1, "even".to_owned())]);
1378    /// # }));
1379    /// # }
1380    /// ```
1381    pub fn generator<A, U, I, F>(
1382        self,
1383        init: impl IntoQuotedMut<'a, I, L> + Copy,
1384        f: impl IntoQuotedMut<'a, F, L> + Copy,
1385    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1386    where
1387        O: IsOrdered,
1388        R: IsExactlyOnce,
1389        K: Clone + Eq + Hash,
1390        I: Fn() -> A + 'a,
1391        F: Fn(&mut A, V) -> Generate<U> + 'a,
1392    {
1393        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1394        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1395
1396        let this = self.make_totally_ordered().make_exactly_once();
1397
1398        let scan_init = q!(|| HashMap::new())
1399            .splice_fn0_ctx::<HashMap<K, Option<A>>>(&this.location)
1400            .into();
1401        let scan_f = q!(move |acc: &mut HashMap<_, _>, (k, v)| {
1402            let existing_state = acc.entry(Clone::clone(&k)).or_insert_with(|| Some(init()));
1403            if let Some(existing_state_value) = existing_state {
1404                match f(existing_state_value, v) {
1405                    Generate::Yield(out) => Some(Some((k, out))),
1406                    Generate::Return(out) => {
1407                        let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1408                        Some(Some((k, out)))
1409                    }
1410                    Generate::Break => {
1411                        let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1412                        Some(None)
1413                    }
1414                    Generate::Continue => Some(None),
1415                }
1416            } else {
1417                Some(None)
1418            }
1419        })
1420        .splice_fn2_borrow_mut_ctx::<HashMap<K, Option<A>>, (K, V), _>(&this.location)
1421        .into();
1422
1423        let scan_node = HydroNode::Scan {
1424            init: scan_init,
1425            acc: scan_f,
1426            input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1427            metadata: this.location.new_node_metadata(Stream::<
1428                Option<(K, U)>,
1429                L,
1430                B,
1431                TotalOrder,
1432                ExactlyOnce,
1433            >::collection_kind()),
1434        };
1435
1436        let flatten_f = q!(|d| d)
1437            .splice_fn1_ctx::<Option<(K, U)>, _>(&this.location)
1438            .into();
1439        let flatten_node = HydroNode::FlatMap {
1440            f: flatten_f,
1441            input: Box::new(scan_node),
1442            metadata: this.location.new_node_metadata(KeyedStream::<
1443                K,
1444                U,
1445                L,
1446                B,
1447                TotalOrder,
1448                ExactlyOnce,
1449            >::collection_kind()),
1450        };
1451
1452        KeyedStream::new(this.location.clone(), flatten_node)
1453    }
1454
1455    /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
1456    /// in-order across the values in each group. But the aggregation function returns a boolean,
1457    /// which when true indicates that the aggregated result is complete and can be released to
1458    /// downstream computation. Unlike [`KeyedStream::fold`], this means that even if the input
1459    /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
1460    /// normal stream elements.
1461    ///
1462    /// # Example
1463    /// ```rust
1464    /// # #[cfg(feature = "deploy")] {
1465    /// # use hydro_lang::prelude::*;
1466    /// # use futures::StreamExt;
1467    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1468    /// process
1469    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1470    ///     .into_keyed()
1471    ///     .fold_early_stop(
1472    ///         q!(|| 0),
1473    ///         q!(|acc, x| {
1474    ///             *acc += x;
1475    ///             x % 2 == 0
1476    ///         }),
1477    ///     )
1478    /// #   .entries()
1479    /// # }, |mut stream| async move {
1480    /// // Output: { 0: 2, 1: 9 }
1481    /// # let mut results = Vec::new();
1482    /// # for _ in 0..2 {
1483    /// #     results.push(stream.next().await.unwrap());
1484    /// # }
1485    /// # results.sort();
1486    /// # assert_eq!(results, vec![(0, 2), (1, 9)]);
1487    /// # }));
1488    /// # }
1489    /// ```
1490    pub fn fold_early_stop<A, I, F>(
1491        self,
1492        init: impl IntoQuotedMut<'a, I, L> + Copy,
1493        f: impl IntoQuotedMut<'a, F, L> + Copy,
1494    ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
1495    where
1496        O: IsOrdered,
1497        R: IsExactlyOnce,
1498        K: Clone + Eq + Hash,
1499        I: Fn() -> A + 'a,
1500        F: Fn(&mut A, V) -> bool + 'a,
1501    {
1502        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1503        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1504        let out_without_bound_cast = self.generator(
1505            q!(move || Some(init())),
1506            q!(move |key_state, v| {
1507                if let Some(key_state_value) = key_state.as_mut() {
1508                    if f(key_state_value, v) {
1509                        Generate::Return(key_state.take().unwrap())
1510                    } else {
1511                        Generate::Continue
1512                    }
1513                } else {
1514                    unreachable!()
1515                }
1516            }),
1517        );
1518
1519        KeyedSingleton::new(
1520            out_without_bound_cast.location.clone(),
1521            HydroNode::Cast {
1522                inner: Box::new(
1523                    out_without_bound_cast
1524                        .ir_node
1525                        .replace(HydroNode::Placeholder),
1526                ),
1527                metadata: out_without_bound_cast
1528                    .location
1529                    .new_node_metadata(
1530                        KeyedSingleton::<K, A, L, B::WhenValueBounded>::collection_kind(),
1531                    ),
1532            },
1533        )
1534    }
1535
1536    /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
1537    /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
1538    /// otherwise the first element would be non-deterministic.
1539    ///
1540    /// # Example
1541    /// ```rust
1542    /// # #[cfg(feature = "deploy")] {
1543    /// # use hydro_lang::prelude::*;
1544    /// # use futures::StreamExt;
1545    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1546    /// process
1547    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1548    ///     .into_keyed()
1549    ///     .first()
1550    /// #   .entries()
1551    /// # }, |mut stream| async move {
1552    /// // Output: { 0: 2, 1: 3 }
1553    /// # let mut results = Vec::new();
1554    /// # for _ in 0..2 {
1555    /// #     results.push(stream.next().await.unwrap());
1556    /// # }
1557    /// # results.sort();
1558    /// # assert_eq!(results, vec![(0, 2), (1, 3)]);
1559    /// # }));
1560    /// # }
1561    /// ```
1562    pub fn first(self) -> KeyedSingleton<K, V, L, B::WhenValueBounded>
1563    where
1564        O: IsOrdered,
1565        R: IsExactlyOnce,
1566        K: Clone + Eq + Hash,
1567    {
1568        self.fold_early_stop(
1569            q!(|| None),
1570            q!(|acc, v| {
1571                *acc = Some(v);
1572                true
1573            }),
1574        )
1575        .map(q!(|v| v.unwrap()))
1576    }
1577
1578    /// Assigns a zero-based index to each value within each key group, emitting
1579    /// `(K, (index, V))` tuples with per-key sequential indices.
1580    ///
1581    /// The output keyed stream has [`TotalOrder`] and [`ExactlyOnce`] guarantees.
1582    /// This is a streaming operator that processes elements as they arrive.
1583    ///
1584    /// # Example
1585    /// ```rust
1586    /// # #[cfg(feature = "deploy")] {
1587    /// # use hydro_lang::prelude::*;
1588    /// # use futures::StreamExt;
1589    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1590    /// process
1591    ///     .source_iter(q!(vec![(1, 10), (2, 20), (1, 30)]))
1592    ///     .into_keyed()
1593    ///     .enumerate()
1594    /// # .entries()
1595    /// # }, |mut stream| async move {
1596    /// // per-key indices: { 1: [(0, 10), (1, 30)], 2: [(0, 20)] }
1597    /// # let mut results = Vec::new();
1598    /// # for _ in 0..3 {
1599    /// #     results.push(stream.next().await.unwrap());
1600    /// # }
1601    /// # let key1: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
1602    /// # let key2: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
1603    /// # assert_eq!(key1, vec![(0, 10), (1, 30)]);
1604    /// # assert_eq!(key2, vec![(0, 20)]);
1605    /// # }));
1606    /// # }
1607    /// ```
1608    pub fn enumerate(self) -> KeyedStream<K, (usize, V), L, B, TotalOrder, ExactlyOnce>
1609    where
1610        O: IsOrdered,
1611        R: IsExactlyOnce,
1612        K: Eq + Hash + Clone,
1613    {
1614        self.scan(
1615            q!(|| 0),
1616            q!(|acc, next| {
1617                let curr = *acc;
1618                *acc += 1;
1619                Some((curr, next))
1620            }),
1621        )
1622    }
1623
1624    /// Counts the number of elements in each group, producing a [`KeyedSingleton`] with the counts.
1625    ///
1626    /// # Example
1627    /// ```rust
1628    /// # #[cfg(feature = "deploy")] {
1629    /// # use hydro_lang::prelude::*;
1630    /// # use futures::StreamExt;
1631    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1632    /// let tick = process.tick();
1633    /// let numbers = process
1634    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4), (1, 5)]))
1635    ///     .into_keyed();
1636    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1637    /// batch
1638    ///     .value_counts()
1639    ///     .entries()
1640    ///     .all_ticks()
1641    /// # }, |mut stream| async move {
1642    /// // (1, 3), (2, 2)
1643    /// # let mut results = Vec::new();
1644    /// # for _ in 0..2 {
1645    /// #     results.push(stream.next().await.unwrap());
1646    /// # }
1647    /// # results.sort();
1648    /// # assert_eq!(results, vec![(1, 3), (2, 2)]);
1649    /// # }));
1650    /// # }
1651    /// ```
1652    pub fn value_counts(self) -> KeyedSingleton<K, usize, L, B::WhenValueUnbounded>
1653    where
1654        R: IsExactlyOnce,
1655        K: Eq + Hash,
1656    {
1657        self.make_exactly_once()
1658            .assume_ordering_trusted(
1659                nondet!(/** ordering within each group affects neither result nor intermediates */),
1660            )
1661            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1662    }
1663
1664    /// Like [`Stream::fold`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1665    /// group via the `comb` closure.
1666    ///
1667    /// Depending on the input stream guarantees, the closure may need to be commutative
1668    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1669    ///
1670    /// If the input and output value types are the same and do not require initialization then use
1671    /// [`KeyedStream::reduce`].
1672    ///
1673    /// # Example
1674    /// ```rust
1675    /// # #[cfg(feature = "deploy")] {
1676    /// # use hydro_lang::prelude::*;
1677    /// # use futures::StreamExt;
1678    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1679    /// let tick = process.tick();
1680    /// let numbers = process
1681    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1682    ///     .into_keyed();
1683    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1684    /// batch
1685    ///     .fold(q!(|| false), q!(|acc, x| *acc |= x))
1686    ///     .entries()
1687    ///     .all_ticks()
1688    /// # }, |mut stream| async move {
1689    /// // (1, false), (2, true)
1690    /// # let mut results = Vec::new();
1691    /// # for _ in 0..2 {
1692    /// #     results.push(stream.next().await.unwrap());
1693    /// # }
1694    /// # results.sort();
1695    /// # assert_eq!(results, vec![(1, false), (2, true)]);
1696    /// # }));
1697    /// # }
1698    /// ```
1699    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V), C, Idemp>(
1700        self,
1701        init: impl IntoQuotedMut<'a, I, L>,
1702        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1703    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
1704    where
1705        K: Eq + Hash,
1706        C: ValidCommutativityFor<O>,
1707        Idemp: ValidIdempotenceFor<R>,
1708    {
1709        let init = init.splice_fn0_ctx(&self.location).into();
1710        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1711        proof.register_proof(&comb);
1712
1713        let ordered = self
1714            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1715            .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1716
1717        KeyedSingleton::new(
1718            ordered.location.clone(),
1719            HydroNode::FoldKeyed {
1720                init,
1721                acc: comb.into(),
1722                input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
1723                metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1724                    K,
1725                    A,
1726                    L,
1727                    B::WhenValueUnbounded,
1728                >::collection_kind()),
1729            },
1730        )
1731    }
1732
1733    /// Like [`Stream::reduce`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1734    /// group via the `comb` closure.
1735    ///
1736    /// Depending on the input stream guarantees, the closure may need to be commutative
1737    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1738    ///
1739    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1740    ///
1741    /// # Example
1742    /// ```rust
1743    /// # #[cfg(feature = "deploy")] {
1744    /// # use hydro_lang::prelude::*;
1745    /// # use futures::StreamExt;
1746    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1747    /// let tick = process.tick();
1748    /// let numbers = process
1749    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1750    ///     .into_keyed();
1751    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1752    /// batch
1753    ///     .reduce(q!(|acc, x| *acc |= x))
1754    ///     .entries()
1755    ///     .all_ticks()
1756    /// # }, |mut stream| async move {
1757    /// // (1, false), (2, true)
1758    /// # let mut results = Vec::new();
1759    /// # for _ in 0..2 {
1760    /// #     results.push(stream.next().await.unwrap());
1761    /// # }
1762    /// # results.sort();
1763    /// # assert_eq!(results, vec![(1, false), (2, true)]);
1764    /// # }));
1765    /// # }
1766    /// ```
1767    pub fn reduce<F: Fn(&mut V, V) + 'a, C, Idemp>(
1768        self,
1769        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1770    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1771    where
1772        K: Eq + Hash,
1773        C: ValidCommutativityFor<O>,
1774        Idemp: ValidIdempotenceFor<R>,
1775    {
1776        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1777        proof.register_proof(&f);
1778
1779        let ordered = self
1780            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1781            .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1782
1783        KeyedSingleton::new(
1784            ordered.location.clone(),
1785            HydroNode::ReduceKeyed {
1786                f: f.into(),
1787                input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
1788                metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1789                    K,
1790                    V,
1791                    L,
1792                    B::WhenValueUnbounded,
1793                >::collection_kind()),
1794            },
1795        )
1796    }
1797
1798    /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark
1799    /// are automatically deleted.
1800    ///
1801    /// Depending on the input stream guarantees, the closure may need to be commutative
1802    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1803    ///
1804    /// # Example
1805    /// ```rust
1806    /// # #[cfg(feature = "deploy")] {
1807    /// # use hydro_lang::prelude::*;
1808    /// # use futures::StreamExt;
1809    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1810    /// let tick = process.tick();
1811    /// let watermark = tick.singleton(q!(2));
1812    /// let numbers = process
1813    ///     .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1814    ///     .into_keyed();
1815    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1816    /// batch
1817    ///     .reduce_watermark(watermark, q!(|acc, x| *acc |= x))
1818    ///     .entries()
1819    ///     .all_ticks()
1820    /// # }, |mut stream| async move {
1821    /// // (2, true)
1822    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1823    /// # }));
1824    /// # }
1825    /// ```
1826    pub fn reduce_watermark<O2, F, C, Idemp>(
1827        self,
1828        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1829        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1830    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1831    where
1832        K: Eq + Hash,
1833        O2: Clone,
1834        F: Fn(&mut V, V) + 'a,
1835        C: ValidCommutativityFor<O>,
1836        Idemp: ValidIdempotenceFor<R>,
1837    {
1838        let other: Optional<O2, Tick<L::Root>, Bounded> = other.into();
1839        check_matching_location(&self.location.root(), other.location.outer());
1840        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1841        proof.register_proof(&f);
1842
1843        let ordered = self
1844            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1845            .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1846
1847        KeyedSingleton::new(
1848            ordered.location.clone(),
1849            HydroNode::ReduceKeyedWatermark {
1850                f: f.into(),
1851                input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
1852                watermark: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1853                metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1854                    K,
1855                    V,
1856                    L,
1857                    B::WhenValueUnbounded,
1858                >::collection_kind()),
1859            },
1860        )
1861    }
1862
1863    /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1864    /// whose keys are not in the bounded stream.
1865    ///
1866    /// # Example
1867    /// ```rust
1868    /// # #[cfg(feature = "deploy")] {
1869    /// # use hydro_lang::prelude::*;
1870    /// # use futures::StreamExt;
1871    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1872    /// let tick = process.tick();
1873    /// let keyed_stream = process
1874    ///     .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1875    ///     .batch(&tick, nondet!(/** test */))
1876    ///     .into_keyed();
1877    /// let keys_to_remove = process
1878    ///     .source_iter(q!(vec![1, 2]))
1879    ///     .batch(&tick, nondet!(/** test */));
1880    /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1881    /// #   .entries()
1882    /// # }, |mut stream| async move {
1883    /// // { 3: ['c'], 4: ['d'] }
1884    /// # let mut results = Vec::new();
1885    /// # for _ in 0..2 {
1886    /// #     results.push(stream.next().await.unwrap());
1887    /// # }
1888    /// # results.sort();
1889    /// # assert_eq!(results, vec![(3, 'c'), (4, 'd')]);
1890    /// # }));
1891    /// # }
1892    /// ```
1893    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1894        self,
1895        other: Stream<K, L, Bounded, O2, R2>,
1896    ) -> Self
1897    where
1898        K: Eq + Hash,
1899    {
1900        check_matching_location(&self.location, &other.location);
1901
1902        KeyedStream::new(
1903            self.location.clone(),
1904            HydroNode::AntiJoin {
1905                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1906                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1907                metadata: self.location.new_node_metadata(Self::collection_kind()),
1908            },
1909        )
1910    }
1911
1912    /// Emit a keyed stream containing keys shared between two keyed streams,
1913    /// where each value in the output keyed stream is a tuple of
1914    /// (self's value, other's value).
1915    /// If there are multiple values for the same key, this performs a cross product
1916    /// for each matching key.
1917    ///
1918    /// # Example
1919    /// ```rust
1920    /// # #[cfg(feature = "deploy")] {
1921    /// # use hydro_lang::prelude::*;
1922    /// # use futures::StreamExt;
1923    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1924    /// let tick = process.tick();
1925    /// let keyed_data = process
1926    ///     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
1927    ///     .into_keyed()
1928    ///     .batch(&tick, nondet!(/** test */));
1929    /// let other_data = process
1930    ///     .source_iter(q!(vec![(1, 100), (2, 200), (2, 201)]))
1931    ///     .into_keyed()
1932    ///     .batch(&tick, nondet!(/** test */));
1933    /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
1934    /// # }, |mut stream| async move {
1935    /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200), (20, 201)] } in any order
1936    /// # let mut results = vec![];
1937    /// # for _ in 0..4 {
1938    /// #     results.push(stream.next().await.unwrap());
1939    /// # }
1940    /// # results.sort();
1941    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200)), (2, (20, 201))]);
1942    /// # }));
1943    /// # }
1944    /// ```
1945    pub fn join_keyed_stream<V2, O2: Ordering, R2: Retries>(
1946        self,
1947        other: KeyedStream<K, V2, L, B, O2, R2>,
1948    ) -> KeyedStream<K, (V, V2), L, B, NoOrder, <R as MinRetries<R2>>::Min>
1949    where
1950        K: Eq + Hash,
1951        R: MinRetries<R2>,
1952    {
1953        self.entries().join(other.entries()).into_keyed()
1954    }
1955
1956    /// Deduplicates values within each key group, emitting each unique value per key
1957    /// exactly once.
1958    ///
1959    /// # Example
1960    /// ```rust
1961    /// # #[cfg(feature = "deploy")] {
1962    /// # use hydro_lang::prelude::*;
1963    /// # use futures::StreamExt;
1964    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1965    /// process
1966    ///     .source_iter(q!(vec![(1, 10), (2, 20), (1, 10), (2, 30), (1, 20)]))
1967    ///     .into_keyed()
1968    ///     .unique()
1969    /// # .entries()
1970    /// # }, |mut stream| async move {
1971    /// // unique values per key: { 1: [10, 20], 2: [20, 30] }
1972    /// # let mut results = Vec::new();
1973    /// # for _ in 0..4 {
1974    /// #     results.push(stream.next().await.unwrap());
1975    /// # }
1976    /// # let mut key1: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
1977    /// # let mut key2: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
1978    /// # key1.sort();
1979    /// # key2.sort();
1980    /// # assert_eq!(key1, vec![10, 20]);
1981    /// # assert_eq!(key2, vec![20, 30]);
1982    /// # }));
1983    /// # }
1984    /// ```
1985    pub fn unique(self) -> KeyedStream<K, V, L, B, NoOrder, ExactlyOnce>
1986    where
1987        K: Eq + Hash + Clone,
1988        V: Eq + Hash + Clone,
1989    {
1990        self.entries().unique().into_keyed()
1991    }
1992
1993    /// Sorts the values within each key group in ascending order.
1994    ///
1995    /// The output keyed stream has a [`TotalOrder`] guarantee on the values within
1996    /// each group. This operator will block until all elements in the input stream
1997    /// are available, so it requires the input stream to be [`Bounded`].
1998    ///
1999    /// # Example
2000    /// ```rust
2001    /// # #[cfg(feature = "deploy")] {
2002    /// # use hydro_lang::prelude::*;
2003    /// # use futures::StreamExt;
2004    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2005    /// let tick = process.tick();
2006    /// let numbers = process
2007    ///     .source_iter(q!(vec![(1, 3), (2, 1), (1, 1), (2, 2)]))
2008    ///     .into_keyed();
2009    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2010    /// batch.sort().all_ticks()
2011    /// # .entries()
2012    /// # }, |mut stream| async move {
2013    /// // values sorted within each key: { 1: [1, 3], 2: [1, 2] }
2014    /// # let mut results = Vec::new();
2015    /// # for _ in 0..4 {
2016    /// #     results.push(stream.next().await.unwrap());
2017    /// # }
2018    /// # let key1_vals: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
2019    /// # let key2_vals: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
2020    /// # assert_eq!(key1_vals, vec![1, 3]);
2021    /// # assert_eq!(key2_vals, vec![1, 2]);
2022    /// # }));
2023    /// # }
2024    /// ```
2025    pub fn sort(self) -> KeyedStream<K, V, L, Bounded, TotalOrder, R>
2026    where
2027        B: IsBounded,
2028        K: Ord,
2029        V: Ord,
2030    {
2031        self.entries().sort().into_keyed()
2032    }
2033
2034    /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
2035    /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
2036    /// is only present in one of the inputs, its values are passed through as-is). The output has
2037    /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
2038    ///
2039    /// Currently, both input streams must be [`Bounded`]. This operator will block
2040    /// on the first stream until all its elements are available. In a future version,
2041    /// we will relax the requirement on the `other` stream.
2042    ///
2043    /// # Example
2044    /// ```rust
2045    /// # #[cfg(feature = "deploy")] {
2046    /// # use hydro_lang::prelude::*;
2047    /// # use futures::StreamExt;
2048    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2049    /// let tick = process.tick();
2050    /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
2051    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2052    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2053    /// # .entries()
2054    /// # }, |mut stream| async move {
2055    /// // { 0: [2, 1], 1: [4, 3] }
2056    /// # let mut results = Vec::new();
2057    /// # for _ in 0..4 {
2058    /// #     results.push(stream.next().await.unwrap());
2059    /// # }
2060    /// # results.sort();
2061    /// # assert_eq!(results, vec![(0, 1), (0, 2), (1, 3), (1, 4)]);
2062    /// # }));
2063    /// # }
2064    /// ```
2065    pub fn chain<O2: Ordering, R2: Retries>(
2066        self,
2067        other: KeyedStream<K, V, L, Bounded, O2, R2>,
2068    ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2069    where
2070        B: IsBounded,
2071        O: MinOrder<O2>,
2072        R: MinRetries<R2>,
2073    {
2074        let this = self.make_bounded();
2075        check_matching_location(&this.location, &other.location);
2076
2077        KeyedStream::new(
2078            this.location.clone(),
2079            HydroNode::Chain {
2080                first: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2081                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2082                metadata: this.location.new_node_metadata(KeyedStream::<
2083                    K,
2084                    V,
2085                    L,
2086                    Bounded,
2087                    <O as MinOrder<O2>>::Min,
2088                    <R as MinRetries<R2>>::Min,
2089                >::collection_kind()),
2090            },
2091        )
2092    }
2093
2094    /// Emit a keyed stream containing keys shared between the keyed stream and the
2095    /// keyed singleton, where each value in the output keyed stream is a tuple of
2096    /// (the keyed stream's value, the keyed singleton's value).
2097    ///
2098    /// # Example
2099    /// ```rust
2100    /// # #[cfg(feature = "deploy")] {
2101    /// # use hydro_lang::prelude::*;
2102    /// # use futures::StreamExt;
2103    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2104    /// let tick = process.tick();
2105    /// let keyed_data = process
2106    ///     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2107    ///     .into_keyed()
2108    ///     .batch(&tick, nondet!(/** test */));
2109    /// let singleton_data = process
2110    ///     .source_iter(q!(vec![(1, 100), (2, 200)]))
2111    ///     .into_keyed()
2112    ///     .batch(&tick, nondet!(/** test */))
2113    ///     .first();
2114    /// keyed_data.join_keyed_singleton(singleton_data).entries().all_ticks()
2115    /// # }, |mut stream| async move {
2116    /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200)] } in any order
2117    /// # let mut results = vec![];
2118    /// # for _ in 0..3 {
2119    /// #     results.push(stream.next().await.unwrap());
2120    /// # }
2121    /// # results.sort();
2122    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200))]);
2123    /// # }));
2124    /// # }
2125    /// ```
2126    pub fn join_keyed_singleton<V2: Clone>(
2127        self,
2128        keyed_singleton: KeyedSingleton<K, V2, L, Bounded>,
2129    ) -> KeyedStream<K, (V, V2), L, Bounded, NoOrder, R>
2130    where
2131        B: IsBounded,
2132        K: Eq + Hash,
2133    {
2134        keyed_singleton
2135            .join_keyed_stream(self.make_bounded())
2136            .map(q!(|(v2, v)| (v, v2)))
2137    }
2138
2139    /// Gets the values associated with a specific key from the keyed stream.
2140    /// Returns an empty stream if the key is `None` or there are no associated values.
2141    ///
2142    /// # Example
2143    /// ```rust
2144    /// # #[cfg(feature = "deploy")] {
2145    /// # use hydro_lang::prelude::*;
2146    /// # use futures::StreamExt;
2147    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2148    /// let tick = process.tick();
2149    /// let keyed_data = process
2150    ///     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2151    ///     .into_keyed()
2152    ///     .batch(&tick, nondet!(/** test */));
2153    /// let key = tick.singleton(q!(1));
2154    /// keyed_data.get(key).all_ticks()
2155    /// # }, |mut stream| async move {
2156    /// // 10, 11 in any order
2157    /// # let mut results = vec![];
2158    /// # for _ in 0..2 {
2159    /// #     results.push(stream.next().await.unwrap());
2160    /// # }
2161    /// # results.sort();
2162    /// # assert_eq!(results, vec![10, 11]);
2163    /// # }));
2164    /// # }
2165    /// ```
2166    pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Stream<V, L, Bounded, NoOrder, R>
2167    where
2168        B: IsBounded,
2169        K: Eq + Hash,
2170    {
2171        self.make_bounded()
2172            .entries()
2173            .join(key.into().into_stream().map(q!(|k| (k, ()))))
2174            .map(q!(|(_, (v, _))| v))
2175    }
2176
2177    /// For each value in `self`, find the matching key in `lookup`.
2178    /// The output is a keyed stream with the key from `self`, and a value
2179    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2180    /// If the key is not present in `lookup`, the option will be [`None`].
2181    ///
2182    /// # Example
2183    /// ```rust
2184    /// # #[cfg(feature = "deploy")] {
2185    /// # use hydro_lang::prelude::*;
2186    /// # use futures::StreamExt;
2187    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2188    /// # let tick = process.tick();
2189    /// let requests = // { 1: [10, 11], 2: 20 }
2190    /// # process
2191    /// #     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2192    /// #     .into_keyed()
2193    /// #     .batch(&tick, nondet!(/** test */));
2194    /// let other_data = // { 10: 100, 11: 110 }
2195    /// # process
2196    /// #     .source_iter(q!(vec![(10, 100), (11, 110)]))
2197    /// #     .into_keyed()
2198    /// #     .batch(&tick, nondet!(/** test */))
2199    /// #     .first();
2200    /// requests.lookup_keyed_singleton(other_data)
2201    /// # .entries().all_ticks()
2202    /// # }, |mut stream| async move {
2203    /// // { 1: [(10, Some(100)), (11, Some(110))], 2: (20, None) }
2204    /// # let mut results = vec![];
2205    /// # for _ in 0..3 {
2206    /// #     results.push(stream.next().await.unwrap());
2207    /// # }
2208    /// # results.sort();
2209    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (11, Some(110))), (2, (20, None))]);
2210    /// # }));
2211    /// # }
2212    /// ```
2213    pub fn lookup_keyed_singleton<V2>(
2214        self,
2215        lookup: KeyedSingleton<V, V2, L, Bounded>,
2216    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
2217    where
2218        B: IsBounded,
2219        K: Eq + Hash + Clone,
2220        V: Eq + Hash + Clone,
2221        V2: Clone,
2222    {
2223        self.lookup_keyed_stream(
2224            lookup
2225                .into_keyed_stream()
2226                .assume_retries::<R>(nondet!(/** Retries are irrelevant for keyed singletons */)),
2227        )
2228    }
2229
2230    /// For each value in `self`, find the matching key in `lookup`.
2231    /// The output is a keyed stream with the key from `self`, and a value
2232    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2233    /// If the key is not present in `lookup`, the option will be [`None`].
2234    ///
2235    /// # Example
2236    /// ```rust
2237    /// # #[cfg(feature = "deploy")] {
2238    /// # use hydro_lang::prelude::*;
2239    /// # use futures::StreamExt;
2240    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2241    /// # let tick = process.tick();
2242    /// let requests = // { 1: [10, 11], 2: 20 }
2243    /// # process
2244    /// #     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2245    /// #     .into_keyed()
2246    /// #     .batch(&tick, nondet!(/** test */));
2247    /// let other_data = // { 10: [100, 101], 11: 110 }
2248    /// # process
2249    /// #     .source_iter(q!(vec![(10, 100), (10, 101), (11, 110)]))
2250    /// #     .into_keyed()
2251    /// #     .batch(&tick, nondet!(/** test */));
2252    /// requests.lookup_keyed_stream(other_data)
2253    /// # .entries().all_ticks()
2254    /// # }, |mut stream| async move {
2255    /// // { 1: [(10, Some(100)), (10, Some(101)), (11, Some(110))], 2: (20, None) }
2256    /// # let mut results = vec![];
2257    /// # for _ in 0..4 {
2258    /// #     results.push(stream.next().await.unwrap());
2259    /// # }
2260    /// # results.sort();
2261    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(101))), (1, (11, Some(110))), (2, (20, None))]);
2262    /// # }));
2263    /// # }
2264    /// ```
2265    #[expect(clippy::type_complexity, reason = "retries propagation")]
2266    pub fn lookup_keyed_stream<V2, O2: Ordering, R2: Retries>(
2267        self,
2268        lookup: KeyedStream<V, V2, L, Bounded, O2, R2>,
2269    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, <R as MinRetries<R2>>::Min>
2270    where
2271        B: IsBounded,
2272        K: Eq + Hash + Clone,
2273        V: Eq + Hash + Clone,
2274        V2: Clone,
2275        R: MinRetries<R2>,
2276    {
2277        let inverted = self
2278            .make_bounded()
2279            .entries()
2280            .map(q!(|(key, lookup_value)| (lookup_value, key)))
2281            .into_keyed();
2282        let found = inverted
2283            .clone()
2284            .join_keyed_stream(lookup.clone())
2285            .entries()
2286            .map(q!(|(lookup_value, (key, value))| (
2287                key,
2288                (lookup_value, Some(value))
2289            )))
2290            .into_keyed();
2291        let not_found = inverted
2292            .filter_key_not_in(lookup.keys())
2293            .entries()
2294            .map(q!(|(lookup_value, key)| (key, (lookup_value, None))))
2295            .into_keyed();
2296
2297        found.chain(not_found.weaken_retries::<<R as MinRetries<R2>>::Min>())
2298    }
2299
2300    /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
2301    /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
2302    ///
2303    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2304    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2305    /// argument that declares where the stream will be atomically processed. Batching a stream into
2306    /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2307    /// [`Tick`] will introduce asynchrony.
2308    pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
2309        let out_location = Atomic { tick: tick.clone() };
2310        KeyedStream::new(
2311            out_location.clone(),
2312            HydroNode::BeginAtomic {
2313                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2314                metadata: out_location
2315                    .new_node_metadata(KeyedStream::<K, V, Atomic<L>, B, O, R>::collection_kind()),
2316            },
2317        )
2318    }
2319
2320    /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
2321    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2322    /// the order of the input.
2323    ///
2324    /// # Non-Determinism
2325    /// The batch boundaries are non-deterministic and may change across executions.
2326    pub fn batch(
2327        self,
2328        tick: &Tick<L>,
2329        nondet: NonDet,
2330    ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2331        let _ = nondet;
2332        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2333        KeyedStream::new(
2334            tick.clone(),
2335            HydroNode::Batch {
2336                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2337                metadata: tick.new_node_metadata(
2338                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
2339                ),
2340            },
2341        )
2342    }
2343}
2344
2345impl<'a, K1, K2, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2346    KeyedStream<(K1, K2), V, L, B, O, R>
2347{
2348    /// Produces a new keyed stream by dropping the first element of the compound key.
2349    ///
2350    /// Because multiple keys may share the same suffix, this operation results in re-grouping
2351    /// of the values under the new keys. The values across groups with the same new key
2352    /// will be interleaved, so the resulting stream has [`NoOrder`] within each group.
2353    ///
2354    /// # Example
2355    /// ```rust
2356    /// # #[cfg(feature = "deploy")] {
2357    /// # use hydro_lang::prelude::*;
2358    /// # use futures::StreamExt;
2359    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2360    /// process
2361    ///     .source_iter(q!(vec![((1, 10), 2), ((1, 10), 3), ((2, 20), 4)]))
2362    ///     .into_keyed()
2363    ///     .drop_key_prefix()
2364    /// #   .entries()
2365    /// # }, |mut stream| async move {
2366    /// // { 10: [2, 3], 20: [4] }
2367    /// # let mut results = Vec::new();
2368    /// # for _ in 0..3 {
2369    /// #     results.push(stream.next().await.unwrap());
2370    /// # }
2371    /// # results.sort();
2372    /// # assert_eq!(results, vec![(10, 2), (10, 3), (20, 4)]);
2373    /// # }));
2374    /// # }
2375    /// ```
2376    pub fn drop_key_prefix(self) -> KeyedStream<K2, V, L, B, NoOrder, R> {
2377        self.entries()
2378            .map(q!(|((_k1, k2), v)| (k2, v)))
2379            .into_keyed()
2380    }
2381}
2382
2383impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries>
2384    KeyedStream<K, V, L, Unbounded, O, R>
2385{
2386    /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
2387    /// of any overlapping groups. The result has [`NoOrder`] on each group because the
2388    /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
2389    /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
2390    ///
2391    /// Currently, both input streams must be [`Unbounded`].
2392    ///
2393    /// # Example
2394    /// ```rust
2395    /// # #[cfg(feature = "deploy")] {
2396    /// # use hydro_lang::prelude::*;
2397    /// # use futures::StreamExt;
2398    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2399    /// let numbers1: KeyedStream<i32, i32, _> = // { 1: [2], 3: [4] }
2400    /// # process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed().into();
2401    /// let numbers2: KeyedStream<i32, i32, _> = // { 1: [3], 3: [5] }
2402    /// # process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed().into();
2403    /// numbers1.merge_unordered(numbers2)
2404    /// #   .entries()
2405    /// # }, |mut stream| async move {
2406    /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
2407    /// # let mut results = Vec::new();
2408    /// # for _ in 0..4 {
2409    /// #     results.push(stream.next().await.unwrap());
2410    /// # }
2411    /// # results.sort();
2412    /// # assert_eq!(results, vec![(1, 2), (1, 3), (3, 4), (3, 5)]);
2413    /// # }));
2414    /// # }
2415    /// ```
2416    pub fn merge_unordered<O2: Ordering, R2: Retries>(
2417        self,
2418        other: KeyedStream<K, V, L, Unbounded, O2, R2>,
2419    ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2420    where
2421        R: MinRetries<R2>,
2422    {
2423        KeyedStream::new(
2424            self.location.clone(),
2425            HydroNode::Chain {
2426                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2427                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2428                metadata: self.location.new_node_metadata(KeyedStream::<
2429                    K,
2430                    V,
2431                    L,
2432                    Unbounded,
2433                    NoOrder,
2434                    <R as MinRetries<R2>>::Min,
2435                >::collection_kind()),
2436            },
2437        )
2438    }
2439
2440    /// Deprecated: use [`KeyedStream::merge_unordered`] instead.
2441    #[deprecated(note = "use `merge_unordered` instead")]
2442    pub fn interleave<O2: Ordering, R2: Retries>(
2443        self,
2444        other: KeyedStream<K, V, L, Unbounded, O2, R2>,
2445    ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2446    where
2447        R: MinRetries<R2>,
2448    {
2449        self.merge_unordered(other)
2450    }
2451}
2452
2453impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
2454where
2455    L: Location<'a> + NoTick,
2456{
2457    /// Returns a keyed stream corresponding to the latest batch of elements being atomically
2458    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2459    /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
2460    /// used to create the atomic section.
2461    ///
2462    /// # Non-Determinism
2463    /// The batch boundaries are non-deterministic and may change across executions.
2464    pub fn batch_atomic(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2465        let _ = nondet;
2466        KeyedStream::new(
2467            self.location.clone().tick,
2468            HydroNode::Batch {
2469                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2470                metadata: self.location.tick.new_node_metadata(KeyedStream::<
2471                    K,
2472                    V,
2473                    Tick<L>,
2474                    Bounded,
2475                    O,
2476                    R,
2477                >::collection_kind(
2478                )),
2479            },
2480        )
2481    }
2482
2483    /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
2484    /// See [`KeyedStream::atomic`] for more details.
2485    pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
2486        KeyedStream::new(
2487            self.location.tick.l.clone(),
2488            HydroNode::EndAtomic {
2489                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2490                metadata: self
2491                    .location
2492                    .tick
2493                    .l
2494                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2495            },
2496        )
2497    }
2498}
2499
2500impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
2501where
2502    L: Location<'a>,
2503{
2504    /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2505    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2506    /// each key.
2507    pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
2508        KeyedStream::new(
2509            self.location.outer().clone(),
2510            HydroNode::YieldConcat {
2511                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2512                metadata: self.location.outer().new_node_metadata(KeyedStream::<
2513                    K,
2514                    V,
2515                    L,
2516                    Unbounded,
2517                    O,
2518                    R,
2519                >::collection_kind(
2520                )),
2521            },
2522        )
2523    }
2524
2525    /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2526    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2527    /// each key.
2528    ///
2529    /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
2530    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2531    /// stream's [`Tick`] context.
2532    pub fn all_ticks_atomic(self) -> KeyedStream<K, V, Atomic<L>, Unbounded, O, R> {
2533        let out_location = Atomic {
2534            tick: self.location.clone(),
2535        };
2536
2537        KeyedStream::new(
2538            out_location.clone(),
2539            HydroNode::YieldConcat {
2540                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2541                metadata: out_location.new_node_metadata(KeyedStream::<
2542                    K,
2543                    V,
2544                    Atomic<L>,
2545                    Unbounded,
2546                    O,
2547                    R,
2548                >::collection_kind()),
2549            },
2550        )
2551    }
2552
2553    /// Transforms the keyed stream using the given closure in "stateful" mode, where stateful operators
2554    /// such as `fold` retrain their memory for each key across ticks rather than resetting across batches of each key.
2555    ///
2556    /// This API is particularly useful for stateful computation on batches of data, such as
2557    /// maintaining an accumulated state that is up to date with the current batch.
2558    ///
2559    /// # Example
2560    /// ```rust
2561    /// # #[cfg(feature = "deploy")] {
2562    /// # use hydro_lang::prelude::*;
2563    /// # use futures::StreamExt;
2564    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2565    /// let tick = process.tick();
2566    /// # // ticks are lazy by default, forces the second tick to run
2567    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2568    /// # let batch_first_tick = process
2569    /// #   .source_iter(q!(vec![(0, 1), (1, 2), (2, 3), (3, 4)]))
2570    /// #   .into_keyed()
2571    /// #   .batch(&tick, nondet!(/** test */));
2572    /// # let batch_second_tick = process
2573    /// #   .source_iter(q!(vec![(0, 5), (1, 6), (2, 7)]))
2574    /// #   .into_keyed()
2575    /// #   .batch(&tick, nondet!(/** test */))
2576    /// #   .defer_tick(); // appears on the second tick
2577    /// let input = batch_first_tick.chain(batch_second_tick).all_ticks();
2578    ///
2579    /// input.batch(&tick, nondet!(/** test */))
2580    ///     .across_ticks(|s| s.reduce(q!(|sum, new| {
2581    ///         *sum += new;
2582    ///     }))).entries().all_ticks()
2583    /// # }, |mut stream| async move {
2584    /// // First tick: [(0, 1), (1, 2), (2, 3), (3, 4)]
2585    /// # let mut results = Vec::new();
2586    /// # for _ in 0..4 {
2587    /// #     results.push(stream.next().await.unwrap());
2588    /// # }
2589    /// # results.sort();
2590    /// # assert_eq!(results, vec![(0, 1), (1, 2), (2, 3), (3, 4)]);
2591    /// // Second tick: [(0, 6), (1, 8), (2, 10), (3, 4)]
2592    /// # results.clear();
2593    /// # for _ in 0..4 {
2594    /// #     results.push(stream.next().await.unwrap());
2595    /// # }
2596    /// # results.sort();
2597    /// # assert_eq!(results, vec![(0, 6), (1, 8), (2, 10), (3, 4)]);
2598    /// # }));
2599    /// # }
2600    /// ```
2601    pub fn across_ticks<Out: BatchAtomic>(
2602        self,
2603        thunk: impl FnOnce(KeyedStream<K, V, Atomic<L>, Unbounded, O, R>) -> Out,
2604    ) -> Out::Batched {
2605        thunk(self.all_ticks_atomic()).batched_atomic()
2606    }
2607
2608    /// Shifts the entries in `self` to the **next tick**, so that the returned keyed stream at
2609    /// tick `T` always has the entries of `self` at tick `T - 1`.
2610    ///
2611    /// At tick `0`, the output keyed stream is empty, since there is no previous tick.
2612    ///
2613    /// This operator enables stateful iterative processing with ticks, by sending data from one
2614    /// tick to the next. For example, you can use it to combine inputs across consecutive batches.
2615    ///
2616    /// # Example
2617    /// ```rust
2618    /// # #[cfg(feature = "deploy")] {
2619    /// # use hydro_lang::prelude::*;
2620    /// # use futures::StreamExt;
2621    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2622    /// let tick = process.tick();
2623    /// # // ticks are lazy by default, forces the second tick to run
2624    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2625    /// # let batch_first_tick = process
2626    /// #   .source_iter(q!(vec![(1, 2), (1, 3)]))
2627    /// #   .batch(&tick, nondet!(/** test */))
2628    /// #   .into_keyed();
2629    /// # let batch_second_tick = process
2630    /// #   .source_iter(q!(vec![(1, 4), (2, 5)]))
2631    /// #   .batch(&tick, nondet!(/** test */))
2632    /// #   .defer_tick()
2633    /// #   .into_keyed(); // appears on the second tick
2634    /// let changes_across_ticks = // { 1: [2, 3] } (first tick), { 1: [4], 2: [5] } (second tick)
2635    /// # batch_first_tick.chain(batch_second_tick);
2636    /// changes_across_ticks.clone().defer_tick().chain( // from the previous tick
2637    ///     changes_across_ticks // from the current tick
2638    /// )
2639    /// # .entries().all_ticks()
2640    /// # }, |mut stream| async move {
2641    /// // First tick: { 1: [2, 3] }
2642    /// # let mut results = Vec::new();
2643    /// # for _ in 0..2 {
2644    /// #     results.push(stream.next().await.unwrap());
2645    /// # }
2646    /// # results.sort();
2647    /// # assert_eq!(results, vec![(1, 2), (1, 3)]);
2648    /// // Second tick: { 1: [2, 3, 4], 2: [5] }
2649    /// # results.clear();
2650    /// # for _ in 0..4 {
2651    /// #     results.push(stream.next().await.unwrap());
2652    /// # }
2653    /// # results.sort();
2654    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5)]);
2655    /// // Third tick: { 1: [4], 2: [5] }
2656    /// # results.clear();
2657    /// # for _ in 0..2 {
2658    /// #     results.push(stream.next().await.unwrap());
2659    /// # }
2660    /// # results.sort();
2661    /// # assert_eq!(results, vec![(1, 4), (2, 5)]);
2662    /// # }));
2663    /// # }
2664    /// ```
2665    pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2666        KeyedStream::new(
2667            self.location.clone(),
2668            HydroNode::DeferTick {
2669                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2670                metadata: self.location.new_node_metadata(KeyedStream::<
2671                    K,
2672                    V,
2673                    Tick<L>,
2674                    Bounded,
2675                    O,
2676                    R,
2677                >::collection_kind()),
2678            },
2679        )
2680    }
2681}
2682
2683#[cfg(test)]
2684mod tests {
2685    #[cfg(feature = "deploy")]
2686    use futures::{SinkExt, StreamExt};
2687    #[cfg(feature = "deploy")]
2688    use hydro_deploy::Deployment;
2689    #[cfg(any(feature = "deploy", feature = "sim"))]
2690    use stageleft::q;
2691
2692    #[cfg(any(feature = "deploy", feature = "sim"))]
2693    use crate::compile::builder::FlowBuilder;
2694    #[cfg(feature = "deploy")]
2695    use crate::live_collections::stream::ExactlyOnce;
2696    #[cfg(feature = "sim")]
2697    use crate::live_collections::stream::{NoOrder, TotalOrder};
2698    #[cfg(any(feature = "deploy", feature = "sim"))]
2699    use crate::location::Location;
2700    #[cfg(any(feature = "deploy", feature = "sim"))]
2701    use crate::nondet::nondet;
2702    #[cfg(feature = "deploy")]
2703    use crate::properties::manual_proof;
2704
2705    #[cfg(feature = "deploy")]
2706    #[tokio::test]
2707    async fn reduce_watermark_filter() {
2708        let mut deployment = Deployment::new();
2709
2710        let mut flow = FlowBuilder::new();
2711        let node = flow.process::<()>();
2712        let external = flow.external::<()>();
2713
2714        let node_tick = node.tick();
2715        let watermark = node_tick.singleton(q!(2));
2716
2717        let sum = node
2718            .source_stream(q!(tokio_stream::iter([
2719                (0, 100),
2720                (1, 101),
2721                (2, 102),
2722                (2, 102)
2723            ])))
2724            .into_keyed()
2725            .reduce_watermark(
2726                watermark,
2727                q!(|acc, v| {
2728                    *acc += v;
2729                }),
2730            )
2731            .snapshot(&node_tick, nondet!(/** test */))
2732            .entries()
2733            .all_ticks()
2734            .send_bincode_external(&external);
2735
2736        let nodes = flow
2737            .with_process(&node, deployment.Localhost())
2738            .with_external(&external, deployment.Localhost())
2739            .deploy(&mut deployment);
2740
2741        deployment.deploy().await.unwrap();
2742
2743        let mut out = nodes.connect(sum).await;
2744
2745        deployment.start().await.unwrap();
2746
2747        assert_eq!(out.next().await.unwrap(), (2, 204));
2748    }
2749
2750    #[cfg(feature = "deploy")]
2751    #[tokio::test]
2752    async fn reduce_watermark_bounded() {
2753        let mut deployment = Deployment::new();
2754
2755        let mut flow = FlowBuilder::new();
2756        let node = flow.process::<()>();
2757        let external = flow.external::<()>();
2758
2759        let node_tick = node.tick();
2760        let watermark = node_tick.singleton(q!(2));
2761
2762        let sum = node
2763            .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2764            .into_keyed()
2765            .reduce_watermark(
2766                watermark,
2767                q!(|acc, v| {
2768                    *acc += v;
2769                }),
2770            )
2771            .entries()
2772            .send_bincode_external(&external);
2773
2774        let nodes = flow
2775            .with_process(&node, deployment.Localhost())
2776            .with_external(&external, deployment.Localhost())
2777            .deploy(&mut deployment);
2778
2779        deployment.deploy().await.unwrap();
2780
2781        let mut out = nodes.connect(sum).await;
2782
2783        deployment.start().await.unwrap();
2784
2785        assert_eq!(out.next().await.unwrap(), (2, 204));
2786    }
2787
2788    #[cfg(feature = "deploy")]
2789    #[tokio::test]
2790    async fn reduce_watermark_garbage_collect() {
2791        let mut deployment = Deployment::new();
2792
2793        let mut flow = FlowBuilder::new();
2794        let node = flow.process::<()>();
2795        let external = flow.external::<()>();
2796        let (tick_send, tick_trigger) =
2797            node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
2798
2799        let node_tick = node.tick();
2800        let (watermark_complete_cycle, watermark) =
2801            node_tick.cycle_with_initial(node_tick.singleton(q!(2)));
2802        let next_watermark = watermark.clone().map(q!(|v| v + 1));
2803        watermark_complete_cycle.complete_next_tick(next_watermark);
2804
2805        let tick_triggered_input = node_tick
2806            .singleton(q!((3, 103)))
2807            .into_stream()
2808            .filter_if(
2809                tick_trigger
2810                    .clone()
2811                    .batch(&node_tick, nondet!(/** test */))
2812                    .first()
2813                    .is_some(),
2814            )
2815            .all_ticks();
2816
2817        let sum = node
2818            .source_stream(q!(tokio_stream::iter([
2819                (0, 100),
2820                (1, 101),
2821                (2, 102),
2822                (2, 102)
2823            ])))
2824            .merge_unordered(tick_triggered_input)
2825            .into_keyed()
2826            .reduce_watermark(
2827                watermark,
2828                q!(
2829                    |acc, v| {
2830                        *acc += v;
2831                    },
2832                    commutative = manual_proof!(/** integer addition is commutative */)
2833                ),
2834            )
2835            .snapshot(&node_tick, nondet!(/** test */))
2836            .entries()
2837            .all_ticks()
2838            .send_bincode_external(&external);
2839
2840        let nodes = flow
2841            .with_default_optimize()
2842            .with_process(&node, deployment.Localhost())
2843            .with_external(&external, deployment.Localhost())
2844            .deploy(&mut deployment);
2845
2846        deployment.deploy().await.unwrap();
2847
2848        let mut tick_send = nodes.connect(tick_send).await;
2849        let mut out_recv = nodes.connect(sum).await;
2850
2851        deployment.start().await.unwrap();
2852
2853        assert_eq!(out_recv.next().await.unwrap(), (2, 204));
2854
2855        tick_send.send(()).await.unwrap();
2856
2857        assert_eq!(out_recv.next().await.unwrap(), (3, 103));
2858    }
2859
2860    #[cfg(feature = "sim")]
2861    #[test]
2862    #[should_panic]
2863    fn sim_batch_nondet_size() {
2864        let mut flow = FlowBuilder::new();
2865        let node = flow.process::<()>();
2866
2867        let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2868
2869        let tick = node.tick();
2870        let out_recv = input
2871            .batch(&tick, nondet!(/** test */))
2872            .fold(q!(|| vec![]), q!(|acc, v| acc.push(v)))
2873            .entries()
2874            .all_ticks()
2875            .sim_output();
2876
2877        flow.sim().exhaustive(async || {
2878            out_recv
2879                .assert_yields_only_unordered([(1, vec![1, 2])])
2880                .await;
2881        });
2882    }
2883
2884    #[cfg(feature = "sim")]
2885    #[test]
2886    fn sim_batch_preserves_group_order() {
2887        let mut flow = FlowBuilder::new();
2888        let node = flow.process::<()>();
2889
2890        let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2891
2892        let tick = node.tick();
2893        let out_recv = input
2894            .batch(&tick, nondet!(/** test */))
2895            .all_ticks()
2896            .fold_early_stop(
2897                q!(|| 0),
2898                q!(|acc, v| {
2899                    *acc = std::cmp::max(v, *acc);
2900                    *acc >= 2
2901                }),
2902            )
2903            .entries()
2904            .sim_output();
2905
2906        let instances = flow.sim().exhaustive(async || {
2907            out_recv
2908                .assert_yields_only_unordered([(1, 2), (2, 3)])
2909                .await;
2910        });
2911
2912        assert_eq!(instances, 8);
2913        // - three cases: all three in a separate tick (pick where (2, 3) is)
2914        // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after
2915        // - two cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them
2916        // - one case: all three together
2917    }
2918
2919    #[cfg(feature = "sim")]
2920    #[test]
2921    fn sim_batch_unordered_shuffles() {
2922        let mut flow = FlowBuilder::new();
2923        let node = flow.process::<()>();
2924
2925        let input = node
2926            .source_iter(q!([(1, 1), (1, 2), (2, 3)]))
2927            .into_keyed()
2928            .weaken_ordering::<NoOrder>();
2929
2930        let tick = node.tick();
2931        let out_recv = input
2932            .batch(&tick, nondet!(/** test */))
2933            .all_ticks()
2934            .entries()
2935            .sim_output();
2936
2937        let instances = flow.sim().exhaustive(async || {
2938            out_recv
2939                .assert_yields_only_unordered([(1, 1), (1, 2), (2, 3)])
2940                .await;
2941        });
2942
2943        assert_eq!(instances, 13);
2944        // - 6 (3 * 2) cases: all three in a separate tick (pick where (2, 3) is), and order of (1, 1), (1, 2)
2945        // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2946        // - 4 (2 * 2) cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them, and order of (1, 1), (1, 2)
2947        // - one case: all three together (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2948    }
2949
2950    #[cfg(feature = "sim")]
2951    #[test]
2952    #[should_panic]
2953    fn sim_observe_order_batched() {
2954        let mut flow = FlowBuilder::new();
2955        let node = flow.process::<()>();
2956
2957        let (port, input) = node.sim_input::<_, NoOrder, _>();
2958
2959        let tick = node.tick();
2960        let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
2961        let out_recv = batch
2962            .assume_ordering::<TotalOrder>(nondet!(/** test */))
2963            .all_ticks()
2964            .first()
2965            .entries()
2966            .sim_output();
2967
2968        flow.sim().exhaustive(async || {
2969            port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
2970            out_recv
2971                .assert_yields_only_unordered([(1, 1), (2, 1)])
2972                .await; // fails with assume_ordering
2973        });
2974    }
2975
2976    #[cfg(feature = "sim")]
2977    #[test]
2978    fn sim_observe_order_batched_count() {
2979        let mut flow = FlowBuilder::new();
2980        let node = flow.process::<()>();
2981
2982        let (port, input) = node.sim_input::<_, NoOrder, _>();
2983
2984        let tick = node.tick();
2985        let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
2986        let out_recv = batch
2987            .assume_ordering::<TotalOrder>(nondet!(/** test */))
2988            .all_ticks()
2989            .entries()
2990            .sim_output();
2991
2992        let instance_count = flow.sim().exhaustive(async || {
2993            port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
2994            let _ = out_recv.collect_sorted::<Vec<_>>().await;
2995        });
2996
2997        assert_eq!(instance_count, 104); // too complicated to enumerate here, but less than stream equivalent
2998    }
2999
3000    #[cfg(feature = "sim")]
3001    #[test]
3002    fn sim_top_level_assume_ordering() {
3003        use std::collections::HashMap;
3004
3005        let mut flow = FlowBuilder::new();
3006        let node = flow.process::<()>();
3007
3008        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3009
3010        let out_recv = input
3011            .into_keyed()
3012            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3013            .fold_early_stop(
3014                q!(|| Vec::new()),
3015                q!(|acc, v| {
3016                    acc.push(v);
3017                    acc.len() >= 2
3018                }),
3019            )
3020            .entries()
3021            .sim_output();
3022
3023        let instance_count = flow.sim().exhaustive(async || {
3024            in_send.send_many_unordered([(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd')]);
3025            let out: HashMap<_, _> = out_recv
3026                .collect_sorted::<Vec<_>>()
3027                .await
3028                .into_iter()
3029                .collect();
3030            // Each key accumulates its values; we get one entry per key
3031            assert_eq!(out.len(), 2);
3032        });
3033
3034        assert_eq!(instance_count, 24)
3035    }
3036
3037    #[cfg(feature = "sim")]
3038    #[test]
3039    fn sim_top_level_assume_ordering_cycle_back() {
3040        use std::collections::HashMap;
3041
3042        let mut flow = FlowBuilder::new();
3043        let node = flow.process::<()>();
3044
3045        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3046
3047        let (complete_cycle_back, cycle_back) =
3048            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3049        let ordered = input
3050            .into_keyed()
3051            .merge_unordered(cycle_back)
3052            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3053        complete_cycle_back.complete(
3054            ordered
3055                .clone()
3056                .map(q!(|v| v + 1))
3057                .filter(q!(|v| v % 2 == 1)),
3058        );
3059
3060        let out_recv = ordered
3061            .fold_early_stop(
3062                q!(|| Vec::new()),
3063                q!(|acc, v| {
3064                    acc.push(v);
3065                    acc.len() >= 2
3066                }),
3067            )
3068            .entries()
3069            .sim_output();
3070
3071        let mut saw = false;
3072        let instance_count = flow.sim().exhaustive(async || {
3073            // Send (1, 0) and (1, 2). 0+1=1 is odd so cycles back.
3074            // We want to see [0, 1] - the cycled back value interleaved
3075            in_send.send_many_unordered([(1, 0), (1, 2)]);
3076            let out: HashMap<_, _> = out_recv
3077                .collect_sorted::<Vec<_>>()
3078                .await
3079                .into_iter()
3080                .collect();
3081
3082            // We want to see an instance where key 1 gets: 0, then 1 (cycled back from 0+1)
3083            if let Some(values) = out.get(&1)
3084                && *values == vec![0, 1]
3085            {
3086                saw = true;
3087            }
3088        });
3089
3090        assert!(
3091            saw,
3092            "did not see an instance with key 1 having [0, 1] in order"
3093        );
3094        assert_eq!(instance_count, 6);
3095    }
3096
3097    #[cfg(feature = "sim")]
3098    #[test]
3099    fn sim_top_level_assume_ordering_cross_key_cycle() {
3100        use std::collections::HashMap;
3101
3102        // This test demonstrates why releasing one entry at a time is important:
3103        // When one key's observed order cycles back into a different key, we need
3104        // to be able to interleave the cycled-back entry with pending items for
3105        // that other key.
3106        let mut flow = FlowBuilder::new();
3107        let node = flow.process::<()>();
3108
3109        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3110
3111        let (complete_cycle_back, cycle_back) =
3112            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3113        let ordered = input
3114            .into_keyed()
3115            .merge_unordered(cycle_back)
3116            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3117
3118        // Cycle back: when we see (1, 10), emit (2, 100) to key 2
3119        complete_cycle_back.complete(
3120            ordered
3121                .clone()
3122                .filter(q!(|v| *v == 10))
3123                .map(q!(|_| 100))
3124                .entries()
3125                .map(q!(|(_, v)| (2, v))) // Change key from 1 to 2
3126                .into_keyed(),
3127        );
3128
3129        let out_recv = ordered
3130            .fold_early_stop(
3131                q!(|| Vec::new()),
3132                q!(|acc, v| {
3133                    acc.push(v);
3134                    acc.len() >= 2
3135                }),
3136            )
3137            .entries()
3138            .sim_output();
3139
3140        // We want to see an instance where:
3141        // - (1, 10) is released first
3142        // - This causes (2, 100) to be cycled back
3143        // - (2, 100) is released BEFORE (2, 20) which was already pending
3144        let mut saw_cross_key_interleave = false;
3145        let instance_count = flow.sim().exhaustive(async || {
3146            // Send (1, 10), (1, 11) for key 1, and (2, 20), (2, 21) for key 2
3147            in_send.send_many_unordered([(1, 10), (1, 11), (2, 20), (2, 21)]);
3148            let out: HashMap<_, _> = out_recv
3149                .collect_sorted::<Vec<_>>()
3150                .await
3151                .into_iter()
3152                .collect();
3153
3154            // Check if we see the cross-key interleaving:
3155            // key 2 should have [100, 20] or [100, 21] - cycled back 100 before a pending item
3156            if let Some(values) = out.get(&2)
3157                && values.len() >= 2
3158                && values[0] == 100
3159            {
3160                saw_cross_key_interleave = true;
3161            }
3162        });
3163
3164        assert!(
3165            saw_cross_key_interleave,
3166            "did not see an instance where cycled-back 100 was released before pending items for key 2"
3167        );
3168        assert_eq!(instance_count, 60);
3169    }
3170
3171    #[cfg(feature = "sim")]
3172    #[test]
3173    fn sim_top_level_assume_ordering_cycle_back_tick() {
3174        use std::collections::HashMap;
3175
3176        let mut flow = FlowBuilder::new();
3177        let node = flow.process::<()>();
3178
3179        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3180
3181        let (complete_cycle_back, cycle_back) =
3182            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3183        let ordered = input
3184            .into_keyed()
3185            .merge_unordered(cycle_back)
3186            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3187        complete_cycle_back.complete(
3188            ordered
3189                .clone()
3190                .batch(&node.tick(), nondet!(/** test */))
3191                .all_ticks()
3192                .map(q!(|v| v + 1))
3193                .filter(q!(|v| v % 2 == 1)),
3194        );
3195
3196        let out_recv = ordered
3197            .fold_early_stop(
3198                q!(|| Vec::new()),
3199                q!(|acc, v| {
3200                    acc.push(v);
3201                    acc.len() >= 2
3202                }),
3203            )
3204            .entries()
3205            .sim_output();
3206
3207        let mut saw = false;
3208        let instance_count = flow.sim().exhaustive(async || {
3209            in_send.send_many_unordered([(1, 0), (1, 2)]);
3210            let out: HashMap<_, _> = out_recv
3211                .collect_sorted::<Vec<_>>()
3212                .await
3213                .into_iter()
3214                .collect();
3215
3216            if let Some(values) = out.get(&1)
3217                && *values == vec![0, 1]
3218            {
3219                saw = true;
3220            }
3221        });
3222
3223        assert!(
3224            saw,
3225            "did not see an instance with key 1 having [0, 1] in order"
3226        );
3227        assert_eq!(instance_count, 58);
3228    }
3229}