Skip to main content

hydro_lang/live_collections/keyed_stream/
mod.rs

1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
11
12use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
13use super::keyed_singleton::KeyedSingleton;
14use super::optional::Optional;
15use super::stream::{
16    ExactlyOnce, IsExactlyOnce, IsOrdered, MinOrder, MinRetries, NoOrder, Stream, TotalOrder,
17};
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::keyed_singleton::KeyedSingletonBound;
27use crate::live_collections::stream::{
28    AtLeastOnce, Ordering, Retries, WeakerOrderingThan, WeakerRetryThan,
29};
30#[cfg(stageleft_runtime)]
31use crate::location::dynamic::{DynLocation, LocationId};
32use crate::location::tick::DeferTick;
33use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
34use crate::manual_expr::ManualExpr;
35use crate::nondet::{NonDet, nondet};
36use crate::properties::{
37    AggFuncAlgebra, ApplyMonotoneKeyedStream, ValidCommutativityFor, ValidIdempotenceFor,
38    manual_proof,
39};
40
41pub mod networking;
42
43/// Streaming elements of type `V` grouped by a key of type `K`.
44///
45/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
46/// order of keys is non-deterministic but the order *within* each group may be deterministic.
47///
48/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
49/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
50/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
51///
52/// Type Parameters:
53/// - `K`: the type of the key for each group
54/// - `V`: the type of the elements inside each group
55/// - `Loc`: the [`Location`] where the keyed stream is materialized
56/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
57/// - `Order`: tracks whether the elements within each group have deterministic order
58///   ([`TotalOrder`]) or not ([`NoOrder`])
59/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
60///   ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
61pub struct KeyedStream<
62    K,
63    V,
64    Loc,
65    Bound: Boundedness = Unbounded,
66    Order: Ordering = TotalOrder,
67    Retry: Retries = ExactlyOnce,
68> {
69    pub(crate) location: Loc,
70    pub(crate) ir_node: RefCell<HydroNode>,
71    pub(crate) flow_state: FlowState,
72
73    _phantom: PhantomData<(K, V, Loc, Bound, Order, Retry)>,
74}
75
76impl<K, V, L, B: Boundedness, O: Ordering, R: Retries> Drop for KeyedStream<K, V, L, B, O, R> {
77    fn drop(&mut self) {
78        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
79        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
80            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
81                input: Box::new(ir_node),
82                op_metadata: HydroIrOpMetadata::new(),
83            });
84        }
85    }
86}
87
88impl<'a, K, V, L, O: Ordering, R: Retries> From<KeyedStream<K, V, L, Bounded, O, R>>
89    for KeyedStream<K, V, L, Unbounded, O, R>
90where
91    L: Location<'a>,
92{
93    fn from(stream: KeyedStream<K, V, L, Bounded, O, R>) -> KeyedStream<K, V, L, Unbounded, O, R> {
94        let new_meta = stream
95            .location
96            .new_node_metadata(KeyedStream::<K, V, L, Unbounded, O, R>::collection_kind());
97
98        KeyedStream {
99            location: stream.location.clone(),
100            flow_state: stream.flow_state.clone(),
101            ir_node: RefCell::new(HydroNode::Cast {
102                inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
103                metadata: new_meta,
104            }),
105            _phantom: PhantomData,
106        }
107    }
108}
109
110impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
111    for KeyedStream<K, V, L, B, NoOrder, R>
112where
113    L: Location<'a>,
114{
115    fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
116        stream.weaken_ordering()
117    }
118}
119
120impl<'a, K, V, L, O: Ordering, R: Retries> DeferTick for KeyedStream<K, V, Tick<L>, Bounded, O, R>
121where
122    L: Location<'a>,
123{
124    fn defer_tick(self) -> Self {
125        KeyedStream::defer_tick(self)
126    }
127}
128
129impl<'a, K, V, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
130    for KeyedStream<K, V, Tick<L>, Bounded, O, R>
131where
132    L: Location<'a>,
133{
134    type Location = Tick<L>;
135
136    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
137        KeyedStream {
138            flow_state: location.flow_state().clone(),
139            location: location.clone(),
140            ir_node: RefCell::new(HydroNode::CycleSource {
141                cycle_id,
142                metadata: location.new_node_metadata(
143                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
144                ),
145            }),
146            _phantom: PhantomData,
147        }
148    }
149}
150
151impl<'a, K, V, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
152    for KeyedStream<K, V, Tick<L>, Bounded, O, R>
153where
154    L: Location<'a>,
155{
156    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
157        assert_eq!(
158            Location::id(&self.location),
159            expected_location,
160            "locations do not match"
161        );
162
163        self.location
164            .flow_state()
165            .borrow_mut()
166            .push_root(HydroRoot::CycleSink {
167                cycle_id,
168                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
169                op_metadata: HydroIrOpMetadata::new(),
170            });
171    }
172}
173
174impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
175    for KeyedStream<K, V, L, B, O, R>
176where
177    L: Location<'a> + NoTick,
178{
179    type Location = L;
180
181    fn create_source(cycle_id: CycleId, location: L) -> Self {
182        KeyedStream {
183            flow_state: location.flow_state().clone(),
184            location: location.clone(),
185            ir_node: RefCell::new(HydroNode::CycleSource {
186                cycle_id,
187                metadata: location
188                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
189            }),
190            _phantom: PhantomData,
191        }
192    }
193}
194
195impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
196    for KeyedStream<K, V, L, B, O, R>
197where
198    L: Location<'a> + NoTick,
199{
200    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
201        assert_eq!(
202            Location::id(&self.location),
203            expected_location,
204            "locations do not match"
205        );
206        self.location
207            .flow_state()
208            .borrow_mut()
209            .push_root(HydroRoot::CycleSink {
210                cycle_id,
211                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
212                op_metadata: HydroIrOpMetadata::new(),
213            });
214    }
215}
216
217impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
218    Clone for KeyedStream<K, V, Loc, Bound, Order, R>
219{
220    fn clone(&self) -> Self {
221        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
222            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
223            *self.ir_node.borrow_mut() = HydroNode::Tee {
224                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
225                metadata: self.location.new_node_metadata(Self::collection_kind()),
226            };
227        }
228
229        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
230            KeyedStream {
231                location: self.location.clone(),
232                flow_state: self.flow_state.clone(),
233                ir_node: HydroNode::Tee {
234                    inner: SharedNode(inner.0.clone()),
235                    metadata: metadata.clone(),
236                }
237                .into(),
238                _phantom: PhantomData,
239            }
240        } else {
241            unreachable!()
242        }
243    }
244}
245
246/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
247/// control the processing of future elements.
248pub enum Generate<T> {
249    /// Emit the provided element, and keep processing future inputs.
250    Yield(T),
251    /// Emit the provided element as the _final_ element, do not process future inputs.
252    Return(T),
253    /// Do not emit anything, but continue processing future inputs.
254    Continue,
255    /// Do not emit anything, and do not process further inputs.
256    Break,
257}
258
259impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
260    KeyedStream<K, V, L, B, O, R>
261{
262    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
263        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
264        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
265
266        let flow_state = location.flow_state().clone();
267        KeyedStream {
268            location,
269            flow_state,
270            ir_node: RefCell::new(ir_node),
271            _phantom: PhantomData,
272        }
273    }
274
275    /// Returns the [`CollectionKind`] corresponding to this type.
276    pub fn collection_kind() -> CollectionKind {
277        CollectionKind::KeyedStream {
278            bound: B::BOUND_KIND,
279            value_order: O::ORDERING_KIND,
280            value_retry: R::RETRIES_KIND,
281            key_type: stageleft::quote_type::<K>().into(),
282            value_type: stageleft::quote_type::<V>().into(),
283        }
284    }
285
286    /// Returns the [`Location`] where this keyed stream is being materialized.
287    pub fn location(&self) -> &L {
288        &self.location
289    }
290
291    /// Explicitly "casts" the keyed stream to a type with a different ordering
292    /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
293    /// by the type-system.
294    ///
295    /// # Non-Determinism
296    /// This function is used as an escape hatch, and any mistakes in the
297    /// provided ordering guarantee will propagate into the guarantees
298    /// for the rest of the program.
299    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
300        if O::ORDERING_KIND == O2::ORDERING_KIND {
301            KeyedStream::new(
302                self.location.clone(),
303                self.ir_node.replace(HydroNode::Placeholder),
304            )
305        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
306            // We can always weaken the ordering guarantee
307            KeyedStream::new(
308                self.location.clone(),
309                HydroNode::Cast {
310                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
311                    metadata: self
312                        .location
313                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
314                },
315            )
316        } else {
317            KeyedStream::new(
318                self.location.clone(),
319                HydroNode::ObserveNonDet {
320                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
321                    trusted: false,
322                    metadata: self
323                        .location
324                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
325                },
326            )
327        }
328    }
329
330    fn assume_ordering_trusted<O2: Ordering>(
331        self,
332        _nondet: NonDet,
333    ) -> KeyedStream<K, V, L, B, O2, R> {
334        if O::ORDERING_KIND == O2::ORDERING_KIND {
335            KeyedStream::new(
336                self.location.clone(),
337                self.ir_node.replace(HydroNode::Placeholder),
338            )
339        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
340            // We can always weaken the ordering guarantee
341            KeyedStream::new(
342                self.location.clone(),
343                HydroNode::Cast {
344                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
345                    metadata: self
346                        .location
347                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
348                },
349            )
350        } else {
351            KeyedStream::new(
352                self.location.clone(),
353                HydroNode::ObserveNonDet {
354                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
355                    trusted: true,
356                    metadata: self
357                        .location
358                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
359                },
360            )
361        }
362    }
363
364    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
365    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
366    /// which is always safe because that is the weakest possible guarantee.
367    pub fn weakest_ordering(self) -> KeyedStream<K, V, L, B, NoOrder, R> {
368        self.weaken_ordering::<NoOrder>()
369    }
370
371    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
372    /// enforcing that `O2` is weaker than the input ordering guarantee.
373    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> KeyedStream<K, V, L, B, O2, R> {
374        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
375        self.assume_ordering::<O2>(nondet)
376    }
377
378    /// Explicitly "casts" the keyed stream to a type with a different retries
379    /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
380    /// be proven by the type-system.
381    ///
382    /// # Non-Determinism
383    /// This function is used as an escape hatch, and any mistakes in the
384    /// provided retries guarantee will propagate into the guarantees
385    /// for the rest of the program.
386    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
387        if R::RETRIES_KIND == R2::RETRIES_KIND {
388            KeyedStream::new(
389                self.location.clone(),
390                self.ir_node.replace(HydroNode::Placeholder),
391            )
392        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
393            // We can always weaken the retries guarantee
394            KeyedStream::new(
395                self.location.clone(),
396                HydroNode::Cast {
397                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
398                    metadata: self
399                        .location
400                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
401                },
402            )
403        } else {
404            KeyedStream::new(
405                self.location.clone(),
406                HydroNode::ObserveNonDet {
407                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
408                    trusted: false,
409                    metadata: self
410                        .location
411                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
412                },
413            )
414        }
415    }
416
417    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
418    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
419    /// which is always safe because that is the weakest possible guarantee.
420    pub fn weakest_retries(self) -> KeyedStream<K, V, L, B, O, AtLeastOnce> {
421        self.weaken_retries::<AtLeastOnce>()
422    }
423
424    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
425    /// enforcing that `R2` is weaker than the input retries guarantee.
426    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> KeyedStream<K, V, L, B, O, R2> {
427        let nondet = nondet!(/** this is a weaker retries guarantee, so it is safe to assume */);
428        self.assume_retries::<R2>(nondet)
429    }
430
431    /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
432    /// implies that `O == TotalOrder`.
433    pub fn make_totally_ordered(self) -> KeyedStream<K, V, L, B, TotalOrder, R>
434    where
435        O: IsOrdered,
436    {
437        self.assume_ordering(nondet!(/** no-op */))
438    }
439
440    /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
441    /// implies that `R == ExactlyOnce`.
442    pub fn make_exactly_once(self) -> KeyedStream<K, V, L, B, O, ExactlyOnce>
443    where
444        R: IsExactlyOnce,
445    {
446        self.assume_retries(nondet!(/** no-op */))
447    }
448
449    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
450    /// implies that `B == Bounded`.
451    pub fn make_bounded(self) -> KeyedStream<K, V, L, Bounded, O, R>
452    where
453        B: IsBounded,
454    {
455        KeyedStream::new(
456            self.location.clone(),
457            self.ir_node.replace(HydroNode::Placeholder),
458        )
459    }
460
461    /// Flattens the keyed stream into an unordered stream of key-value pairs.
462    ///
463    /// # Example
464    /// ```rust
465    /// # #[cfg(feature = "deploy")] {
466    /// # use hydro_lang::prelude::*;
467    /// # use futures::StreamExt;
468    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
469    /// process
470    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
471    ///     .into_keyed()
472    ///     .entries()
473    /// # }, |mut stream| async move {
474    /// // (1, 2), (1, 3), (2, 4) in any order
475    /// # let mut results = Vec::new();
476    /// # for _ in 0..3 {
477    /// #     results.push(stream.next().await.unwrap());
478    /// # }
479    /// # results.sort();
480    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
481    /// # }));
482    /// # }
483    /// ```
484    pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
485        Stream::new(
486            self.location.clone(),
487            HydroNode::Cast {
488                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
489                metadata: self
490                    .location
491                    .new_node_metadata(Stream::<(K, V), L, B, NoOrder, R>::collection_kind()),
492            },
493        )
494    }
495
496    /// Flattens the keyed stream into an unordered stream of only the values.
497    ///
498    /// # Example
499    /// ```rust
500    /// # #[cfg(feature = "deploy")] {
501    /// # use hydro_lang::prelude::*;
502    /// # use futures::StreamExt;
503    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
504    /// process
505    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
506    ///     .into_keyed()
507    ///     .values()
508    /// # }, |mut stream| async move {
509    /// // 2, 3, 4 in any order
510    /// # let mut results = Vec::new();
511    /// # for _ in 0..3 {
512    /// #     results.push(stream.next().await.unwrap());
513    /// # }
514    /// # results.sort();
515    /// # assert_eq!(results, vec![2, 3, 4]);
516    /// # }));
517    /// # }
518    /// ```
519    pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
520        self.entries().map(q!(|(_, v)| v))
521    }
522
523    /// Flattens the keyed stream into an unordered stream of just the keys.
524    ///
525    /// # Example
526    /// ```rust
527    /// # #[cfg(feature = "deploy")] {
528    /// # use hydro_lang::prelude::*;
529    /// # use futures::StreamExt;
530    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
531    /// # process
532    /// #     .source_iter(q!(vec![(1, 2), (2, 4), (1, 5)]))
533    /// #     .into_keyed()
534    /// #     .keys()
535    /// # }, |mut stream| async move {
536    /// // 1, 2 in any order
537    /// # let mut results = Vec::new();
538    /// # for _ in 0..2 {
539    /// #     results.push(stream.next().await.unwrap());
540    /// # }
541    /// # results.sort();
542    /// # assert_eq!(results, vec![1, 2]);
543    /// # }));
544    /// # }
545    /// ```
546    pub fn keys(self) -> Stream<K, L, B, NoOrder, ExactlyOnce>
547    where
548        K: Eq + Hash,
549    {
550        self.entries().map(q!(|(k, _)| k)).unique()
551    }
552
553    /// Transforms each value by invoking `f` on each element, with keys staying the same
554    /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
555    ///
556    /// If you do not want to modify the stream and instead only want to view
557    /// each item use [`KeyedStream::inspect`] instead.
558    ///
559    /// # Example
560    /// ```rust
561    /// # #[cfg(feature = "deploy")] {
562    /// # use hydro_lang::prelude::*;
563    /// # use futures::StreamExt;
564    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
565    /// process
566    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
567    ///     .into_keyed()
568    ///     .map(q!(|v| v + 1))
569    /// #   .entries()
570    /// # }, |mut stream| async move {
571    /// // { 1: [3, 4], 2: [5] }
572    /// # let mut results = Vec::new();
573    /// # for _ in 0..3 {
574    /// #     results.push(stream.next().await.unwrap());
575    /// # }
576    /// # results.sort();
577    /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 5)]);
578    /// # }));
579    /// # }
580    /// ```
581    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
582    where
583        F: Fn(V) -> U + 'a,
584    {
585        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
586        let map_f = q!({
587            let orig = f;
588            move |(k, v)| (k, orig(v))
589        })
590        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
591        .into();
592
593        KeyedStream::new(
594            self.location.clone(),
595            HydroNode::Map {
596                f: map_f,
597                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
598                metadata: self
599                    .location
600                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
601            },
602        )
603    }
604
605    /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
606    /// re-grouped even they are tuples; instead they will be grouped under the original key.
607    ///
608    /// If you do not want to modify the stream and instead only want to view
609    /// each item use [`KeyedStream::inspect_with_key`] instead.
610    ///
611    /// # Example
612    /// ```rust
613    /// # #[cfg(feature = "deploy")] {
614    /// # use hydro_lang::prelude::*;
615    /// # use futures::StreamExt;
616    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
617    /// process
618    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
619    ///     .into_keyed()
620    ///     .map_with_key(q!(|(k, v)| k + v))
621    /// #   .entries()
622    /// # }, |mut stream| async move {
623    /// // { 1: [3, 4], 2: [6] }
624    /// # let mut results = Vec::new();
625    /// # for _ in 0..3 {
626    /// #     results.push(stream.next().await.unwrap());
627    /// # }
628    /// # results.sort();
629    /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 6)]);
630    /// # }));
631    /// # }
632    /// ```
633    pub fn map_with_key<U, F>(
634        self,
635        f: impl IntoQuotedMut<'a, F, L> + Copy,
636    ) -> KeyedStream<K, U, L, B, O, R>
637    where
638        F: Fn((K, V)) -> U + 'a,
639        K: Clone,
640    {
641        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
642        let map_f = q!({
643            let orig = f;
644            move |(k, v)| {
645                let out = orig((Clone::clone(&k), v));
646                (k, out)
647            }
648        })
649        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
650        .into();
651
652        KeyedStream::new(
653            self.location.clone(),
654            HydroNode::Map {
655                f: map_f,
656                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
657                metadata: self
658                    .location
659                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
660            },
661        )
662    }
663
664    /// Prepends a new value to the key of each element in the stream, producing a new
665    /// keyed stream with compound keys. Because the original key is preserved, no re-grouping
666    /// occurs and the elements in each group preserve their original order.
667    ///
668    /// # Example
669    /// ```rust
670    /// # #[cfg(feature = "deploy")] {
671    /// # use hydro_lang::prelude::*;
672    /// # use futures::StreamExt;
673    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
674    /// process
675    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
676    ///     .into_keyed()
677    ///     .prefix_key(q!(|&(k, _)| k % 2))
678    /// #   .entries()
679    /// # }, |mut stream| async move {
680    /// // { (1, 1): [2, 3], (0, 2): [4] }
681    /// # let mut results = Vec::new();
682    /// # for _ in 0..3 {
683    /// #     results.push(stream.next().await.unwrap());
684    /// # }
685    /// # results.sort();
686    /// # assert_eq!(results, vec![((0, 2), 4), ((1, 1), 2), ((1, 1), 3)]);
687    /// # }));
688    /// # }
689    /// ```
690    pub fn prefix_key<K2, F>(
691        self,
692        f: impl IntoQuotedMut<'a, F, L> + Copy,
693    ) -> KeyedStream<(K2, K), V, L, B, O, R>
694    where
695        F: Fn(&(K, V)) -> K2 + 'a,
696    {
697        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
698        let map_f = q!({
699            let orig = f;
700            move |kv| {
701                let out = orig(&kv);
702                ((out, kv.0), kv.1)
703            }
704        })
705        .splice_fn1_ctx::<(K, V), ((K2, K), V)>(&self.location)
706        .into();
707
708        KeyedStream::new(
709            self.location.clone(),
710            HydroNode::Map {
711                f: map_f,
712                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
713                metadata: self
714                    .location
715                    .new_node_metadata(KeyedStream::<(K2, K), V, L, B, O, R>::collection_kind()),
716            },
717        )
718    }
719
720    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
721    /// `f`, preserving the order of the elements within the group.
722    ///
723    /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
724    /// not modify or take ownership of the values. If you need to modify the values while filtering
725    /// use [`KeyedStream::filter_map`] instead.
726    ///
727    /// # Example
728    /// ```rust
729    /// # #[cfg(feature = "deploy")] {
730    /// # use hydro_lang::prelude::*;
731    /// # use futures::StreamExt;
732    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
733    /// process
734    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
735    ///     .into_keyed()
736    ///     .filter(q!(|&x| x > 2))
737    /// #   .entries()
738    /// # }, |mut stream| async move {
739    /// // { 1: [3], 2: [4] }
740    /// # let mut results = Vec::new();
741    /// # for _ in 0..2 {
742    /// #     results.push(stream.next().await.unwrap());
743    /// # }
744    /// # results.sort();
745    /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
746    /// # }));
747    /// # }
748    /// ```
749    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
750    where
751        F: Fn(&V) -> bool + 'a,
752    {
753        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
754        let filter_f = q!({
755            let orig = f;
756            move |t: &(_, _)| orig(&t.1)
757        })
758        .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
759        .into();
760
761        KeyedStream::new(
762            self.location.clone(),
763            HydroNode::Filter {
764                f: filter_f,
765                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
766                metadata: self.location.new_node_metadata(Self::collection_kind()),
767            },
768        )
769    }
770
771    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
772    /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
773    ///
774    /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
775    /// not modify or take ownership of the values. If you need to modify the values while filtering
776    /// use [`KeyedStream::filter_map_with_key`] instead.
777    ///
778    /// # Example
779    /// ```rust
780    /// # #[cfg(feature = "deploy")] {
781    /// # use hydro_lang::prelude::*;
782    /// # use futures::StreamExt;
783    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
784    /// process
785    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
786    ///     .into_keyed()
787    ///     .filter_with_key(q!(|&(k, v)| v - k == 2))
788    /// #   .entries()
789    /// # }, |mut stream| async move {
790    /// // { 1: [3], 2: [4] }
791    /// # let mut results = Vec::new();
792    /// # for _ in 0..2 {
793    /// #     results.push(stream.next().await.unwrap());
794    /// # }
795    /// # results.sort();
796    /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
797    /// # }));
798    /// # }
799    /// ```
800    pub fn filter_with_key<F>(
801        self,
802        f: impl IntoQuotedMut<'a, F, L> + Copy,
803    ) -> KeyedStream<K, V, L, B, O, R>
804    where
805        F: Fn(&(K, V)) -> bool + 'a,
806    {
807        let filter_f = f
808            .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
809            .into();
810
811        KeyedStream::new(
812            self.location.clone(),
813            HydroNode::Filter {
814                f: filter_f,
815                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
816                metadata: self.location.new_node_metadata(Self::collection_kind()),
817            },
818        )
819    }
820
821    /// An operator that both filters and maps each value, with keys staying the same.
822    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
823    /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
824    ///
825    /// # Example
826    /// ```rust
827    /// # #[cfg(feature = "deploy")] {
828    /// # use hydro_lang::prelude::*;
829    /// # use futures::StreamExt;
830    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
831    /// process
832    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
833    ///     .into_keyed()
834    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
835    /// #   .entries()
836    /// # }, |mut stream| async move {
837    /// // { 1: [2], 2: [4] }
838    /// # let mut results = Vec::new();
839    /// # for _ in 0..2 {
840    /// #     results.push(stream.next().await.unwrap());
841    /// # }
842    /// # results.sort();
843    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
844    /// # }));
845    /// # }
846    /// ```
847    pub fn filter_map<U, F>(
848        self,
849        f: impl IntoQuotedMut<'a, F, L> + Copy,
850    ) -> KeyedStream<K, U, L, B, O, R>
851    where
852        F: Fn(V) -> Option<U> + 'a,
853    {
854        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
855        let filter_map_f = q!({
856            let orig = f;
857            move |(k, v)| orig(v).map(|o| (k, o))
858        })
859        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
860        .into();
861
862        KeyedStream::new(
863            self.location.clone(),
864            HydroNode::FilterMap {
865                f: filter_map_f,
866                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
867                metadata: self
868                    .location
869                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
870            },
871        )
872    }
873
874    /// An operator that both filters and maps each key-value pair. The resulting values are **not**
875    /// re-grouped even they are tuples; instead they will be grouped under the original key.
876    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
877    ///
878    /// # Example
879    /// ```rust
880    /// # #[cfg(feature = "deploy")] {
881    /// # use hydro_lang::prelude::*;
882    /// # use futures::StreamExt;
883    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
884    /// process
885    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
886    ///     .into_keyed()
887    ///     .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
888    /// #   .entries()
889    /// # }, |mut stream| async move {
890    /// // { 2: [2] }
891    /// # let mut results = Vec::new();
892    /// # for _ in 0..1 {
893    /// #     results.push(stream.next().await.unwrap());
894    /// # }
895    /// # results.sort();
896    /// # assert_eq!(results, vec![(2, 2)]);
897    /// # }));
898    /// # }
899    /// ```
900    pub fn filter_map_with_key<U, F>(
901        self,
902        f: impl IntoQuotedMut<'a, F, L> + Copy,
903    ) -> KeyedStream<K, U, L, B, O, R>
904    where
905        F: Fn((K, V)) -> Option<U> + 'a,
906        K: Clone,
907    {
908        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
909        let filter_map_f = q!({
910            let orig = f;
911            move |(k, v)| {
912                let out = orig((Clone::clone(&k), v));
913                out.map(|o| (k, o))
914            }
915        })
916        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
917        .into();
918
919        KeyedStream::new(
920            self.location.clone(),
921            HydroNode::FilterMap {
922                f: filter_map_f,
923                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
924                metadata: self
925                    .location
926                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
927            },
928        )
929    }
930
931    /// Generates a keyed stream that maps each value `v` to a tuple `(v, x)`,
932    /// where `v` is the value of `other`, a bounded [`super::singleton::Singleton`] or
933    /// [`Optional`]. If `other` is an empty [`Optional`], no values will be produced.
934    ///
935    /// # Example
936    /// ```rust
937    /// # #[cfg(feature = "deploy")] {
938    /// # use hydro_lang::prelude::*;
939    /// # use futures::StreamExt;
940    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
941    /// let tick = process.tick();
942    /// let batch = process
943    ///   .source_iter(q!(vec![(1, 123), (1, 456), (2, 123)]))
944    ///   .into_keyed()
945    ///   .batch(&tick, nondet!(/** test */));
946    /// let count = batch.clone().entries().count(); // `count()` returns a singleton
947    /// batch.cross_singleton(count).all_ticks().entries()
948    /// # }, |mut stream| async move {
949    /// // { 1: [(123, 3), (456, 3)], 2: [(123, 3)] }
950    /// # let mut results = Vec::new();
951    /// # for _ in 0..3 {
952    /// #     results.push(stream.next().await.unwrap());
953    /// # }
954    /// # results.sort();
955    /// # assert_eq!(results, vec![(1, (123, 3)), (1, (456, 3)), (2, (123, 3))]);
956    /// # }));
957    /// # }
958    /// ```
959    pub fn cross_singleton<O2>(
960        self,
961        other: impl Into<Optional<O2, L, Bounded>>,
962    ) -> KeyedStream<K, (V, O2), L, B, O, R>
963    where
964        O2: Clone,
965    {
966        let other: Optional<O2, L, Bounded> = other.into();
967        check_matching_location(&self.location, &other.location);
968
969        Stream::new(
970            self.location.clone(),
971            HydroNode::CrossSingleton {
972                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
973                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
974                metadata: self
975                    .location
976                    .new_node_metadata(Stream::<((K, V), O2), L, B, O, R>::collection_kind()),
977            },
978        )
979        .map(q!(|((k, v), o2)| (k, (v, o2))))
980        .into_keyed()
981    }
982
983    /// For each value `v` in each group, transform `v` using `f` and then treat the
984    /// result as an [`Iterator`] to produce values one by one within the same group.
985    /// The implementation for [`Iterator`] for the output type `I` must produce items
986    /// in a **deterministic** order.
987    ///
988    /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
989    /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
990    ///
991    /// # Example
992    /// ```rust
993    /// # #[cfg(feature = "deploy")] {
994    /// # use hydro_lang::prelude::*;
995    /// # use futures::StreamExt;
996    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
997    /// process
998    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
999    ///     .into_keyed()
1000    ///     .flat_map_ordered(q!(|x| x))
1001    /// #   .entries()
1002    /// # }, |mut stream| async move {
1003    /// // { 1: [2, 3, 4], 2: [5, 6] }
1004    /// # let mut results = Vec::new();
1005    /// # for _ in 0..5 {
1006    /// #     results.push(stream.next().await.unwrap());
1007    /// # }
1008    /// # results.sort();
1009    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1010    /// # }));
1011    /// # }
1012    /// ```
1013    pub fn flat_map_ordered<U, I, F>(
1014        self,
1015        f: impl IntoQuotedMut<'a, F, L> + Copy,
1016    ) -> KeyedStream<K, U, L, B, O, R>
1017    where
1018        I: IntoIterator<Item = U>,
1019        F: Fn(V) -> I + 'a,
1020        K: Clone,
1021    {
1022        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1023        let flat_map_f = q!({
1024            let orig = f;
1025            move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1026        })
1027        .splice_fn1_ctx::<(K, V), _>(&self.location)
1028        .into();
1029
1030        KeyedStream::new(
1031            self.location.clone(),
1032            HydroNode::FlatMap {
1033                f: flat_map_f,
1034                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1035                metadata: self
1036                    .location
1037                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
1038            },
1039        )
1040    }
1041
1042    /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
1043    /// for the output type `I` to produce items in any order.
1044    ///
1045    /// # Example
1046    /// ```rust
1047    /// # #[cfg(feature = "deploy")] {
1048    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1049    /// # use futures::StreamExt;
1050    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1051    /// process
1052    ///     .source_iter(q!(vec![
1053    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1054    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
1055    ///     ]))
1056    ///     .into_keyed()
1057    ///     .flat_map_unordered(q!(|x| x))
1058    /// #   .entries()
1059    /// # }, |mut stream| async move {
1060    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1061    /// # let mut results = Vec::new();
1062    /// # for _ in 0..4 {
1063    /// #     results.push(stream.next().await.unwrap());
1064    /// # }
1065    /// # results.sort();
1066    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1067    /// # }));
1068    /// # }
1069    /// ```
1070    pub fn flat_map_unordered<U, I, F>(
1071        self,
1072        f: impl IntoQuotedMut<'a, F, L> + Copy,
1073    ) -> KeyedStream<K, U, L, B, NoOrder, R>
1074    where
1075        I: IntoIterator<Item = U>,
1076        F: Fn(V) -> I + 'a,
1077        K: Clone,
1078    {
1079        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1080        let flat_map_f = q!({
1081            let orig = f;
1082            move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1083        })
1084        .splice_fn1_ctx::<(K, V), _>(&self.location)
1085        .into();
1086
1087        KeyedStream::new(
1088            self.location.clone(),
1089            HydroNode::FlatMap {
1090                f: flat_map_f,
1091                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1092                metadata: self
1093                    .location
1094                    .new_node_metadata(KeyedStream::<K, U, L, B, NoOrder, R>::collection_kind()),
1095            },
1096        )
1097    }
1098
1099    /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
1100    /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
1101    /// items in a **deterministic** order.
1102    ///
1103    /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
1104    /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
1105    ///
1106    /// # Example
1107    /// ```rust
1108    /// # #[cfg(feature = "deploy")] {
1109    /// # use hydro_lang::prelude::*;
1110    /// # use futures::StreamExt;
1111    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1112    /// process
1113    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
1114    ///     .into_keyed()
1115    ///     .flatten_ordered()
1116    /// #   .entries()
1117    /// # }, |mut stream| async move {
1118    /// // { 1: [2, 3, 4], 2: [5, 6] }
1119    /// # let mut results = Vec::new();
1120    /// # for _ in 0..5 {
1121    /// #     results.push(stream.next().await.unwrap());
1122    /// # }
1123    /// # results.sort();
1124    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1125    /// # }));
1126    /// # }
1127    /// ```
1128    pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
1129    where
1130        V: IntoIterator<Item = U>,
1131        K: Clone,
1132    {
1133        self.flat_map_ordered(q!(|d| d))
1134    }
1135
1136    /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
1137    /// for the value type `V` to produce items in any order.
1138    ///
1139    /// # Example
1140    /// ```rust
1141    /// # #[cfg(feature = "deploy")] {
1142    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1143    /// # use futures::StreamExt;
1144    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1145    /// process
1146    ///     .source_iter(q!(vec![
1147    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1148    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
1149    ///     ]))
1150    ///     .into_keyed()
1151    ///     .flatten_unordered()
1152    /// #   .entries()
1153    /// # }, |mut stream| async move {
1154    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1155    /// # let mut results = Vec::new();
1156    /// # for _ in 0..4 {
1157    /// #     results.push(stream.next().await.unwrap());
1158    /// # }
1159    /// # results.sort();
1160    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1161    /// # }));
1162    /// # }
1163    /// ```
1164    pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
1165    where
1166        V: IntoIterator<Item = U>,
1167        K: Clone,
1168    {
1169        self.flat_map_unordered(q!(|d| d))
1170    }
1171
1172    /// An operator which allows you to "inspect" each element of a stream without
1173    /// modifying it. The closure `f` is called on a reference to each value. This is
1174    /// mainly useful for debugging, and should not be used to generate side-effects.
1175    ///
1176    /// # Example
1177    /// ```rust
1178    /// # #[cfg(feature = "deploy")] {
1179    /// # use hydro_lang::prelude::*;
1180    /// # use futures::StreamExt;
1181    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1182    /// process
1183    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1184    ///     .into_keyed()
1185    ///     .inspect(q!(|v| println!("{}", v)))
1186    /// #   .entries()
1187    /// # }, |mut stream| async move {
1188    /// # let mut results = Vec::new();
1189    /// # for _ in 0..3 {
1190    /// #     results.push(stream.next().await.unwrap());
1191    /// # }
1192    /// # results.sort();
1193    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1194    /// # }));
1195    /// # }
1196    /// ```
1197    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1198    where
1199        F: Fn(&V) + 'a,
1200    {
1201        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1202        let inspect_f = q!({
1203            let orig = f;
1204            move |t: &(_, _)| orig(&t.1)
1205        })
1206        .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1207        .into();
1208
1209        KeyedStream::new(
1210            self.location.clone(),
1211            HydroNode::Inspect {
1212                f: inspect_f,
1213                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1214                metadata: self.location.new_node_metadata(Self::collection_kind()),
1215            },
1216        )
1217    }
1218
1219    /// An operator which allows you to "inspect" each element of a stream without
1220    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1221    /// mainly useful for debugging, and should not be used to generate side-effects.
1222    ///
1223    /// # Example
1224    /// ```rust
1225    /// # #[cfg(feature = "deploy")] {
1226    /// # use hydro_lang::prelude::*;
1227    /// # use futures::StreamExt;
1228    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1229    /// process
1230    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1231    ///     .into_keyed()
1232    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1233    /// #   .entries()
1234    /// # }, |mut stream| async move {
1235    /// # let mut results = Vec::new();
1236    /// # for _ in 0..3 {
1237    /// #     results.push(stream.next().await.unwrap());
1238    /// # }
1239    /// # results.sort();
1240    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1241    /// # }));
1242    /// # }
1243    /// ```
1244    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1245    where
1246        F: Fn(&(K, V)) + 'a,
1247    {
1248        let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1249
1250        KeyedStream::new(
1251            self.location.clone(),
1252            HydroNode::Inspect {
1253                f: inspect_f,
1254                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1255                metadata: self.location.new_node_metadata(Self::collection_kind()),
1256            },
1257        )
1258    }
1259
1260    /// An operator which allows you to "name" a `HydroNode`.
1261    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1262    pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
1263        {
1264            let mut node = self.ir_node.borrow_mut();
1265            let metadata = node.metadata_mut();
1266            metadata.tag = Some(name.to_owned());
1267        }
1268        self
1269    }
1270
1271    /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
1272    ///
1273    /// Unlike [`KeyedStream::fold`] which only returns the final accumulated value, `scan` produces a new stream
1274    /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
1275    /// early by returning `None`.
1276    ///
1277    /// The function takes a mutable reference to the accumulator and the current element, and returns
1278    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1279    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1280    ///
1281    /// # Example
1282    /// ```rust
1283    /// # #[cfg(feature = "deploy")] {
1284    /// # use hydro_lang::prelude::*;
1285    /// # use futures::StreamExt;
1286    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1287    /// process
1288    ///     .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
1289    ///     .into_keyed()
1290    ///     .scan(
1291    ///         q!(|| 0),
1292    ///         q!(|acc, x| {
1293    ///             *acc += x;
1294    ///             if *acc % 2 == 0 { None } else { Some(*acc) }
1295    ///         }),
1296    ///     )
1297    /// #   .entries()
1298    /// # }, |mut stream| async move {
1299    /// // Output: { 0: [1], 1: [3, 7] }
1300    /// # let mut results = Vec::new();
1301    /// # for _ in 0..3 {
1302    /// #     results.push(stream.next().await.unwrap());
1303    /// # }
1304    /// # results.sort();
1305    /// # assert_eq!(results, vec![(0, 1), (1, 3), (1, 7)]);
1306    /// # }));
1307    /// # }
1308    /// ```
1309    pub fn scan<A, U, I, F>(
1310        self,
1311        init: impl IntoQuotedMut<'a, I, L> + Copy,
1312        f: impl IntoQuotedMut<'a, F, L> + Copy,
1313    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1314    where
1315        O: IsOrdered,
1316        R: IsExactlyOnce,
1317        K: Clone + Eq + Hash,
1318        I: Fn() -> A + 'a,
1319        F: Fn(&mut A, V) -> Option<U> + 'a,
1320    {
1321        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1322        self.make_totally_ordered().make_exactly_once().generator(
1323            init,
1324            q!({
1325                let orig = f;
1326                move |state, v| {
1327                    if let Some(out) = orig(state, v) {
1328                        Generate::Yield(out)
1329                    } else {
1330                        Generate::Break
1331                    }
1332                }
1333            }),
1334        )
1335    }
1336
1337    /// Iteratively processes the elements in each group using a state machine that can yield
1338    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1339    /// syntax in Rust, without requiring special syntax.
1340    ///
1341    /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
1342    /// state for each group. The second argument defines the processing logic, taking in a
1343    /// mutable reference to the group's state and the value to be processed. It emits a
1344    /// [`Generate`] value, whose variants define what is emitted and whether further inputs
1345    /// should be processed.
1346    ///
1347    /// # Example
1348    /// ```rust
1349    /// # #[cfg(feature = "deploy")] {
1350    /// # use hydro_lang::prelude::*;
1351    /// # use futures::StreamExt;
1352    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1353    /// process
1354    ///     .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
1355    ///     .into_keyed()
1356    ///     .generator(
1357    ///         q!(|| 0),
1358    ///         q!(|acc, x| {
1359    ///             *acc += x;
1360    ///             if *acc > 100 {
1361    ///                 hydro_lang::live_collections::keyed_stream::Generate::Return(
1362    ///                     "done!".to_owned()
1363    ///                 )
1364    ///             } else if *acc % 2 == 0 {
1365    ///                 hydro_lang::live_collections::keyed_stream::Generate::Yield(
1366    ///                     "even".to_owned()
1367    ///                 )
1368    ///             } else {
1369    ///                 hydro_lang::live_collections::keyed_stream::Generate::Continue
1370    ///             }
1371    ///         }),
1372    ///     )
1373    /// #   .entries()
1374    /// # }, |mut stream| async move {
1375    /// // Output: { 0: ["even", "done!"], 1: ["even"] }
1376    /// # let mut results = Vec::new();
1377    /// # for _ in 0..3 {
1378    /// #     results.push(stream.next().await.unwrap());
1379    /// # }
1380    /// # results.sort();
1381    /// # assert_eq!(results, vec![(0, "done!".to_owned()), (0, "even".to_owned()), (1, "even".to_owned())]);
1382    /// # }));
1383    /// # }
1384    /// ```
1385    pub fn generator<A, U, I, F>(
1386        self,
1387        init: impl IntoQuotedMut<'a, I, L> + Copy,
1388        f: impl IntoQuotedMut<'a, F, L> + Copy,
1389    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1390    where
1391        O: IsOrdered,
1392        R: IsExactlyOnce,
1393        K: Clone + Eq + Hash,
1394        I: Fn() -> A + 'a,
1395        F: Fn(&mut A, V) -> Generate<U> + 'a,
1396    {
1397        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1398        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1399
1400        let this = self.make_totally_ordered().make_exactly_once();
1401
1402        let scan_init = q!(|| HashMap::new())
1403            .splice_fn0_ctx::<HashMap<K, Option<A>>>(&this.location)
1404            .into();
1405        let scan_f = q!(move |acc: &mut HashMap<_, _>, (k, v)| {
1406            let existing_state = acc.entry(Clone::clone(&k)).or_insert_with(|| Some(init()));
1407            if let Some(existing_state_value) = existing_state {
1408                match f(existing_state_value, v) {
1409                    Generate::Yield(out) => Some(Some((k, out))),
1410                    Generate::Return(out) => {
1411                        let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1412                        Some(Some((k, out)))
1413                    }
1414                    Generate::Break => {
1415                        let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1416                        Some(None)
1417                    }
1418                    Generate::Continue => Some(None),
1419                }
1420            } else {
1421                Some(None)
1422            }
1423        })
1424        .splice_fn2_borrow_mut_ctx::<HashMap<K, Option<A>>, (K, V), _>(&this.location)
1425        .into();
1426
1427        let scan_node = HydroNode::Scan {
1428            init: scan_init,
1429            acc: scan_f,
1430            input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1431            metadata: this.location.new_node_metadata(Stream::<
1432                Option<(K, U)>,
1433                L,
1434                B,
1435                TotalOrder,
1436                ExactlyOnce,
1437            >::collection_kind()),
1438        };
1439
1440        let flatten_f = q!(|d| d)
1441            .splice_fn1_ctx::<Option<(K, U)>, _>(&this.location)
1442            .into();
1443        let flatten_node = HydroNode::FlatMap {
1444            f: flatten_f,
1445            input: Box::new(scan_node),
1446            metadata: this.location.new_node_metadata(KeyedStream::<
1447                K,
1448                U,
1449                L,
1450                B,
1451                TotalOrder,
1452                ExactlyOnce,
1453            >::collection_kind()),
1454        };
1455
1456        KeyedStream::new(this.location.clone(), flatten_node)
1457    }
1458
1459    /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
1460    /// in-order across the values in each group. But the aggregation function returns a boolean,
1461    /// which when true indicates that the aggregated result is complete and can be released to
1462    /// downstream computation. Unlike [`KeyedStream::fold`], this means that even if the input
1463    /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
1464    /// normal stream elements.
1465    ///
1466    /// # Example
1467    /// ```rust
1468    /// # #[cfg(feature = "deploy")] {
1469    /// # use hydro_lang::prelude::*;
1470    /// # use futures::StreamExt;
1471    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1472    /// process
1473    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1474    ///     .into_keyed()
1475    ///     .fold_early_stop(
1476    ///         q!(|| 0),
1477    ///         q!(|acc, x| {
1478    ///             *acc += x;
1479    ///             x % 2 == 0
1480    ///         }),
1481    ///     )
1482    /// #   .entries()
1483    /// # }, |mut stream| async move {
1484    /// // Output: { 0: 2, 1: 9 }
1485    /// # let mut results = Vec::new();
1486    /// # for _ in 0..2 {
1487    /// #     results.push(stream.next().await.unwrap());
1488    /// # }
1489    /// # results.sort();
1490    /// # assert_eq!(results, vec![(0, 2), (1, 9)]);
1491    /// # }));
1492    /// # }
1493    /// ```
1494    pub fn fold_early_stop<A, I, F>(
1495        self,
1496        init: impl IntoQuotedMut<'a, I, L> + Copy,
1497        f: impl IntoQuotedMut<'a, F, L> + Copy,
1498    ) -> KeyedSingleton<K, A, L, B::WithBoundedValue>
1499    where
1500        O: IsOrdered,
1501        R: IsExactlyOnce,
1502        K: Clone + Eq + Hash,
1503        I: Fn() -> A + 'a,
1504        F: Fn(&mut A, V) -> bool + 'a,
1505    {
1506        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1507        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1508        let out_without_bound_cast = self.generator(
1509            q!(move || Some(init())),
1510            q!(move |key_state, v| {
1511                if let Some(key_state_value) = key_state.as_mut() {
1512                    if f(key_state_value, v) {
1513                        Generate::Return(key_state.take().unwrap())
1514                    } else {
1515                        Generate::Continue
1516                    }
1517                } else {
1518                    unreachable!()
1519                }
1520            }),
1521        );
1522
1523        KeyedSingleton::new(
1524            out_without_bound_cast.location.clone(),
1525            HydroNode::Cast {
1526                inner: Box::new(
1527                    out_without_bound_cast
1528                        .ir_node
1529                        .replace(HydroNode::Placeholder),
1530                ),
1531                metadata: out_without_bound_cast
1532                    .location
1533                    .new_node_metadata(
1534                        KeyedSingleton::<K, A, L, B::WithBoundedValue>::collection_kind(),
1535                    ),
1536            },
1537        )
1538    }
1539
1540    /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
1541    /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
1542    /// otherwise the first element would be non-deterministic.
1543    ///
1544    /// # Example
1545    /// ```rust
1546    /// # #[cfg(feature = "deploy")] {
1547    /// # use hydro_lang::prelude::*;
1548    /// # use futures::StreamExt;
1549    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1550    /// process
1551    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1552    ///     .into_keyed()
1553    ///     .first()
1554    /// #   .entries()
1555    /// # }, |mut stream| async move {
1556    /// // Output: { 0: 2, 1: 3 }
1557    /// # let mut results = Vec::new();
1558    /// # for _ in 0..2 {
1559    /// #     results.push(stream.next().await.unwrap());
1560    /// # }
1561    /// # results.sort();
1562    /// # assert_eq!(results, vec![(0, 2), (1, 3)]);
1563    /// # }));
1564    /// # }
1565    /// ```
1566    pub fn first(self) -> KeyedSingleton<K, V, L, B::WithBoundedValue>
1567    where
1568        O: IsOrdered,
1569        R: IsExactlyOnce,
1570        K: Clone + Eq + Hash,
1571    {
1572        self.fold_early_stop(
1573            q!(|| None),
1574            q!(|acc, v| {
1575                *acc = Some(v);
1576                true
1577            }),
1578        )
1579        .map(q!(|v| v.unwrap()))
1580    }
1581
1582    /// Returns a keyed stream containing at most the first `n` values per key,
1583    /// preserving the original order within each group. Similar to SQL `LIMIT`
1584    /// applied per group.
1585    ///
1586    /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1587    /// retries, since the result depends on the order and cardinality of elements
1588    /// within each group.
1589    ///
1590    /// # Example
1591    /// ```rust
1592    /// # #[cfg(feature = "deploy")] {
1593    /// # use hydro_lang::prelude::*;
1594    /// # use futures::StreamExt;
1595    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1596    /// process
1597    ///     .source_iter(q!(vec![(1, 10), (1, 20), (1, 30), (2, 40), (2, 50)]))
1598    ///     .into_keyed()
1599    ///     .limit(q!(2))
1600    /// #   .entries()
1601    /// # }, |mut stream| async move {
1602    /// // { 1: [10, 20], 2: [40, 50] }
1603    /// # let mut results = Vec::new();
1604    /// # for _ in 0..4 {
1605    /// #     results.push(stream.next().await.unwrap());
1606    /// # }
1607    /// # results.sort();
1608    /// # assert_eq!(results, vec![(1, 10), (1, 20), (2, 40), (2, 50)]);
1609    /// # }));
1610    /// # }
1611    /// ```
1612    pub fn limit(
1613        self,
1614        n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1615    ) -> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
1616    where
1617        O: IsOrdered,
1618        R: IsExactlyOnce,
1619        K: Clone + Eq + Hash,
1620    {
1621        self.generator(
1622            q!(|| 0usize),
1623            q!(move |count, item| {
1624                if *count == n {
1625                    Generate::Break
1626                } else {
1627                    *count += 1;
1628                    if *count == n {
1629                        Generate::Return(item)
1630                    } else {
1631                        Generate::Yield(item)
1632                    }
1633                }
1634            }),
1635        )
1636    }
1637
1638    /// Assigns a zero-based index to each value within each key group, emitting
1639    /// `(K, (index, V))` tuples with per-key sequential indices.
1640    ///
1641    /// The output keyed stream has [`TotalOrder`] and [`ExactlyOnce`] guarantees.
1642    /// This is a streaming operator that processes elements as they arrive.
1643    ///
1644    /// # Example
1645    /// ```rust
1646    /// # #[cfg(feature = "deploy")] {
1647    /// # use hydro_lang::prelude::*;
1648    /// # use futures::StreamExt;
1649    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1650    /// process
1651    ///     .source_iter(q!(vec![(1, 10), (2, 20), (1, 30)]))
1652    ///     .into_keyed()
1653    ///     .enumerate()
1654    /// # .entries()
1655    /// # }, |mut stream| async move {
1656    /// // per-key indices: { 1: [(0, 10), (1, 30)], 2: [(0, 20)] }
1657    /// # let mut results = Vec::new();
1658    /// # for _ in 0..3 {
1659    /// #     results.push(stream.next().await.unwrap());
1660    /// # }
1661    /// # let key1: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
1662    /// # let key2: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
1663    /// # assert_eq!(key1, vec![(0, 10), (1, 30)]);
1664    /// # assert_eq!(key2, vec![(0, 20)]);
1665    /// # }));
1666    /// # }
1667    /// ```
1668    pub fn enumerate(self) -> KeyedStream<K, (usize, V), L, B, TotalOrder, ExactlyOnce>
1669    where
1670        O: IsOrdered,
1671        R: IsExactlyOnce,
1672        K: Eq + Hash + Clone,
1673    {
1674        self.scan(
1675            q!(|| 0),
1676            q!(|acc, next| {
1677                let curr = *acc;
1678                *acc += 1;
1679                Some((curr, next))
1680            }),
1681        )
1682    }
1683
1684    /// Counts the number of elements in each group, producing a [`KeyedSingleton`] with the counts.
1685    ///
1686    /// # Example
1687    /// ```rust
1688    /// # #[cfg(feature = "deploy")] {
1689    /// # use hydro_lang::prelude::*;
1690    /// # use futures::StreamExt;
1691    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1692    /// let tick = process.tick();
1693    /// let numbers = process
1694    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4), (1, 5)]))
1695    ///     .into_keyed();
1696    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1697    /// batch
1698    ///     .value_counts()
1699    ///     .entries()
1700    ///     .all_ticks()
1701    /// # }, |mut stream| async move {
1702    /// // (1, 3), (2, 2)
1703    /// # let mut results = Vec::new();
1704    /// # for _ in 0..2 {
1705    /// #     results.push(stream.next().await.unwrap());
1706    /// # }
1707    /// # results.sort();
1708    /// # assert_eq!(results, vec![(1, 3), (2, 2)]);
1709    /// # }));
1710    /// # }
1711    /// ```
1712    pub fn value_counts(
1713        self,
1714    ) -> KeyedSingleton<K, usize, L, <B as KeyedSingletonBound>::KeyedStreamToMonotone>
1715    where
1716        R: IsExactlyOnce,
1717        K: Eq + Hash,
1718    {
1719        self.make_exactly_once()
1720            .assume_ordering_trusted(
1721                nondet!(/** ordering within each group affects neither result nor intermediates */),
1722            )
1723            .fold(
1724                q!(|| 0),
1725                q!(
1726                    |acc, _| *acc += 1,
1727                    monotone = manual_proof!(/** += 1 is monotonic */)
1728                ),
1729            )
1730    }
1731
1732    /// Like [`Stream::fold`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1733    /// group via the `comb` closure.
1734    ///
1735    /// Depending on the input stream guarantees, the closure may need to be commutative
1736    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1737    ///
1738    /// If the input and output value types are the same and do not require initialization then use
1739    /// [`KeyedStream::reduce`].
1740    ///
1741    /// # Example
1742    /// ```rust
1743    /// # #[cfg(feature = "deploy")] {
1744    /// # use hydro_lang::prelude::*;
1745    /// # use futures::StreamExt;
1746    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1747    /// let tick = process.tick();
1748    /// let numbers = process
1749    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1750    ///     .into_keyed();
1751    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1752    /// batch
1753    ///     .fold(q!(|| false), q!(|acc, x| *acc |= x))
1754    ///     .entries()
1755    ///     .all_ticks()
1756    /// # }, |mut stream| async move {
1757    /// // (1, false), (2, true)
1758    /// # let mut results = Vec::new();
1759    /// # for _ in 0..2 {
1760    /// #     results.push(stream.next().await.unwrap());
1761    /// # }
1762    /// # results.sort();
1763    /// # assert_eq!(results, vec![(1, false), (2, true)]);
1764    /// # }));
1765    /// # }
1766    /// ```
1767    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V), C, Idemp, M, B2: KeyedSingletonBound>(
1768        self,
1769        init: impl IntoQuotedMut<'a, I, L>,
1770        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1771    ) -> KeyedSingleton<K, A, L, B2>
1772    where
1773        K: Eq + Hash,
1774        C: ValidCommutativityFor<O>,
1775        Idemp: ValidIdempotenceFor<R>,
1776        B: ApplyMonotoneKeyedStream<M, B2>,
1777    {
1778        let init = init.splice_fn0_ctx(&self.location).into();
1779        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1780        proof.register_proof(&comb);
1781
1782        let ordered = self
1783            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1784            .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1785
1786        KeyedSingleton::new(
1787            ordered.location.clone(),
1788            HydroNode::FoldKeyed {
1789                init,
1790                acc: comb.into(),
1791                input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
1792                metadata: ordered
1793                    .location
1794                    .new_node_metadata(KeyedSingleton::<K, A, L, B2>::collection_kind()),
1795            },
1796        )
1797    }
1798
1799    /// Like [`Stream::reduce`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1800    /// group via the `comb` closure.
1801    ///
1802    /// Depending on the input stream guarantees, the closure may need to be commutative
1803    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1804    ///
1805    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1806    ///
1807    /// # Example
1808    /// ```rust
1809    /// # #[cfg(feature = "deploy")] {
1810    /// # use hydro_lang::prelude::*;
1811    /// # use futures::StreamExt;
1812    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1813    /// let tick = process.tick();
1814    /// let numbers = process
1815    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1816    ///     .into_keyed();
1817    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1818    /// batch
1819    ///     .reduce(q!(|acc, x| *acc |= x))
1820    ///     .entries()
1821    ///     .all_ticks()
1822    /// # }, |mut stream| async move {
1823    /// // (1, false), (2, true)
1824    /// # let mut results = Vec::new();
1825    /// # for _ in 0..2 {
1826    /// #     results.push(stream.next().await.unwrap());
1827    /// # }
1828    /// # results.sort();
1829    /// # assert_eq!(results, vec![(1, false), (2, true)]);
1830    /// # }));
1831    /// # }
1832    /// ```
1833    pub fn reduce<F: Fn(&mut V, V) + 'a, C, Idemp>(
1834        self,
1835        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1836    ) -> KeyedSingleton<K, V, L, B>
1837    where
1838        K: Eq + Hash,
1839        C: ValidCommutativityFor<O>,
1840        Idemp: ValidIdempotenceFor<R>,
1841    {
1842        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1843        proof.register_proof(&f);
1844
1845        let ordered = self
1846            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1847            .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1848
1849        KeyedSingleton::new(
1850            ordered.location.clone(),
1851            HydroNode::ReduceKeyed {
1852                f: f.into(),
1853                input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
1854                metadata: ordered
1855                    .location
1856                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1857            },
1858        )
1859    }
1860
1861    /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark
1862    /// are automatically deleted.
1863    ///
1864    /// Depending on the input stream guarantees, the closure may need to be commutative
1865    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1866    ///
1867    /// # Example
1868    /// ```rust
1869    /// # #[cfg(feature = "deploy")] {
1870    /// # use hydro_lang::prelude::*;
1871    /// # use futures::StreamExt;
1872    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1873    /// let tick = process.tick();
1874    /// let watermark = tick.singleton(q!(2));
1875    /// let numbers = process
1876    ///     .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1877    ///     .into_keyed();
1878    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1879    /// batch
1880    ///     .reduce_watermark(watermark, q!(|acc, x| *acc |= x))
1881    ///     .entries()
1882    ///     .all_ticks()
1883    /// # }, |mut stream| async move {
1884    /// // (2, true)
1885    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1886    /// # }));
1887    /// # }
1888    /// ```
1889    pub fn reduce_watermark<O2, F, C, Idemp>(
1890        self,
1891        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1892        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1893    ) -> KeyedSingleton<K, V, L, B>
1894    where
1895        K: Eq + Hash,
1896        O2: Clone,
1897        F: Fn(&mut V, V) + 'a,
1898        C: ValidCommutativityFor<O>,
1899        Idemp: ValidIdempotenceFor<R>,
1900    {
1901        let other: Optional<O2, Tick<L::Root>, Bounded> = other.into();
1902        check_matching_location(&self.location.root(), other.location.outer());
1903        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1904        proof.register_proof(&f);
1905
1906        let ordered = self
1907            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1908            .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1909
1910        KeyedSingleton::new(
1911            ordered.location.clone(),
1912            HydroNode::ReduceKeyedWatermark {
1913                f: f.into(),
1914                input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
1915                watermark: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1916                metadata: ordered
1917                    .location
1918                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1919            },
1920        )
1921    }
1922
1923    /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1924    /// whose keys are not in the bounded stream.
1925    ///
1926    /// # Example
1927    /// ```rust
1928    /// # #[cfg(feature = "deploy")] {
1929    /// # use hydro_lang::prelude::*;
1930    /// # use futures::StreamExt;
1931    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1932    /// let tick = process.tick();
1933    /// let keyed_stream = process
1934    ///     .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1935    ///     .batch(&tick, nondet!(/** test */))
1936    ///     .into_keyed();
1937    /// let keys_to_remove = process
1938    ///     .source_iter(q!(vec![1, 2]))
1939    ///     .batch(&tick, nondet!(/** test */));
1940    /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1941    /// #   .entries()
1942    /// # }, |mut stream| async move {
1943    /// // { 3: ['c'], 4: ['d'] }
1944    /// # let mut results = Vec::new();
1945    /// # for _ in 0..2 {
1946    /// #     results.push(stream.next().await.unwrap());
1947    /// # }
1948    /// # results.sort();
1949    /// # assert_eq!(results, vec![(3, 'c'), (4, 'd')]);
1950    /// # }));
1951    /// # }
1952    /// ```
1953    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1954        self,
1955        other: Stream<K, L, Bounded, O2, R2>,
1956    ) -> Self
1957    where
1958        K: Eq + Hash,
1959    {
1960        check_matching_location(&self.location, &other.location);
1961
1962        KeyedStream::new(
1963            self.location.clone(),
1964            HydroNode::AntiJoin {
1965                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1966                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1967                metadata: self.location.new_node_metadata(Self::collection_kind()),
1968            },
1969        )
1970    }
1971
1972    /// Emit a keyed stream containing keys shared between two keyed streams,
1973    /// where each value in the output keyed stream is a tuple of
1974    /// (self's value, other's value).
1975    /// If there are multiple values for the same key, this performs a cross product
1976    /// for each matching key.
1977    ///
1978    /// # Example
1979    /// ```rust
1980    /// # #[cfg(feature = "deploy")] {
1981    /// # use hydro_lang::prelude::*;
1982    /// # use futures::StreamExt;
1983    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1984    /// let tick = process.tick();
1985    /// let keyed_data = process
1986    ///     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
1987    ///     .into_keyed()
1988    ///     .batch(&tick, nondet!(/** test */));
1989    /// let other_data = process
1990    ///     .source_iter(q!(vec![(1, 100), (2, 200), (2, 201)]))
1991    ///     .into_keyed()
1992    ///     .batch(&tick, nondet!(/** test */));
1993    /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
1994    /// # }, |mut stream| async move {
1995    /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200), (20, 201)] } in any order
1996    /// # let mut results = vec![];
1997    /// # for _ in 0..4 {
1998    /// #     results.push(stream.next().await.unwrap());
1999    /// # }
2000    /// # results.sort();
2001    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200)), (2, (20, 201))]);
2002    /// # }));
2003    /// # }
2004    /// ```
2005    pub fn join_keyed_stream<V2, O2: Ordering, R2: Retries>(
2006        self,
2007        other: KeyedStream<K, V2, L, B, O2, R2>,
2008    ) -> KeyedStream<K, (V, V2), L, B, NoOrder, <R as MinRetries<R2>>::Min>
2009    where
2010        K: Eq + Hash,
2011        R: MinRetries<R2>,
2012    {
2013        self.entries().join(other.entries()).into_keyed()
2014    }
2015
2016    /// Deduplicates values within each key group, emitting each unique value per key
2017    /// exactly once.
2018    ///
2019    /// # Example
2020    /// ```rust
2021    /// # #[cfg(feature = "deploy")] {
2022    /// # use hydro_lang::prelude::*;
2023    /// # use futures::StreamExt;
2024    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2025    /// process
2026    ///     .source_iter(q!(vec![(1, 10), (2, 20), (1, 10), (2, 30), (1, 20)]))
2027    ///     .into_keyed()
2028    ///     .unique()
2029    /// # .entries()
2030    /// # }, |mut stream| async move {
2031    /// // unique values per key: { 1: [10, 20], 2: [20, 30] }
2032    /// # let mut results = Vec::new();
2033    /// # for _ in 0..4 {
2034    /// #     results.push(stream.next().await.unwrap());
2035    /// # }
2036    /// # let mut key1: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
2037    /// # let mut key2: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
2038    /// # key1.sort();
2039    /// # key2.sort();
2040    /// # assert_eq!(key1, vec![10, 20]);
2041    /// # assert_eq!(key2, vec![20, 30]);
2042    /// # }));
2043    /// # }
2044    /// ```
2045    pub fn unique(self) -> KeyedStream<K, V, L, B, NoOrder, ExactlyOnce>
2046    where
2047        K: Eq + Hash + Clone,
2048        V: Eq + Hash + Clone,
2049    {
2050        self.entries().unique().into_keyed()
2051    }
2052
2053    /// Sorts the values within each key group in ascending order.
2054    ///
2055    /// The output keyed stream has a [`TotalOrder`] guarantee on the values within
2056    /// each group. This operator will block until all elements in the input stream
2057    /// are available, so it requires the input stream to be [`Bounded`].
2058    ///
2059    /// # Example
2060    /// ```rust
2061    /// # #[cfg(feature = "deploy")] {
2062    /// # use hydro_lang::prelude::*;
2063    /// # use futures::StreamExt;
2064    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2065    /// let tick = process.tick();
2066    /// let numbers = process
2067    ///     .source_iter(q!(vec![(1, 3), (2, 1), (1, 1), (2, 2)]))
2068    ///     .into_keyed();
2069    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2070    /// batch.sort().all_ticks()
2071    /// # .entries()
2072    /// # }, |mut stream| async move {
2073    /// // values sorted within each key: { 1: [1, 3], 2: [1, 2] }
2074    /// # let mut results = Vec::new();
2075    /// # for _ in 0..4 {
2076    /// #     results.push(stream.next().await.unwrap());
2077    /// # }
2078    /// # let key1_vals: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
2079    /// # let key2_vals: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
2080    /// # assert_eq!(key1_vals, vec![1, 3]);
2081    /// # assert_eq!(key2_vals, vec![1, 2]);
2082    /// # }));
2083    /// # }
2084    /// ```
2085    pub fn sort(self) -> KeyedStream<K, V, L, Bounded, TotalOrder, R>
2086    where
2087        B: IsBounded,
2088        K: Ord,
2089        V: Ord,
2090    {
2091        self.entries().sort().into_keyed()
2092    }
2093
2094    /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
2095    /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
2096    /// is only present in one of the inputs, its values are passed through as-is). The output has
2097    /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
2098    ///
2099    /// Currently, both input streams must be [`Bounded`]. This operator will block
2100    /// on the first stream until all its elements are available. In a future version,
2101    /// we will relax the requirement on the `other` stream.
2102    ///
2103    /// # Example
2104    /// ```rust
2105    /// # #[cfg(feature = "deploy")] {
2106    /// # use hydro_lang::prelude::*;
2107    /// # use futures::StreamExt;
2108    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2109    /// let tick = process.tick();
2110    /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
2111    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2112    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2113    /// # .entries()
2114    /// # }, |mut stream| async move {
2115    /// // { 0: [2, 1], 1: [4, 3] }
2116    /// # let mut results = Vec::new();
2117    /// # for _ in 0..4 {
2118    /// #     results.push(stream.next().await.unwrap());
2119    /// # }
2120    /// # results.sort();
2121    /// # assert_eq!(results, vec![(0, 1), (0, 2), (1, 3), (1, 4)]);
2122    /// # }));
2123    /// # }
2124    /// ```
2125    pub fn chain<O2: Ordering, R2: Retries>(
2126        self,
2127        other: KeyedStream<K, V, L, Bounded, O2, R2>,
2128    ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2129    where
2130        B: IsBounded,
2131        O: MinOrder<O2>,
2132        R: MinRetries<R2>,
2133    {
2134        let this = self.make_bounded();
2135        check_matching_location(&this.location, &other.location);
2136
2137        KeyedStream::new(
2138            this.location.clone(),
2139            HydroNode::Chain {
2140                first: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2141                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2142                metadata: this.location.new_node_metadata(KeyedStream::<
2143                    K,
2144                    V,
2145                    L,
2146                    Bounded,
2147                    <O as MinOrder<O2>>::Min,
2148                    <R as MinRetries<R2>>::Min,
2149                >::collection_kind()),
2150            },
2151        )
2152    }
2153
2154    /// Emit a keyed stream containing keys shared between the keyed stream and the
2155    /// keyed singleton, where each value in the output keyed stream is a tuple of
2156    /// (the keyed stream's value, the keyed singleton's value).
2157    ///
2158    /// # Example
2159    /// ```rust
2160    /// # #[cfg(feature = "deploy")] {
2161    /// # use hydro_lang::prelude::*;
2162    /// # use futures::StreamExt;
2163    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2164    /// let tick = process.tick();
2165    /// let keyed_data = process
2166    ///     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2167    ///     .into_keyed()
2168    ///     .batch(&tick, nondet!(/** test */));
2169    /// let singleton_data = process
2170    ///     .source_iter(q!(vec![(1, 100), (2, 200)]))
2171    ///     .into_keyed()
2172    ///     .batch(&tick, nondet!(/** test */))
2173    ///     .first();
2174    /// keyed_data.join_keyed_singleton(singleton_data).entries().all_ticks()
2175    /// # }, |mut stream| async move {
2176    /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200)] } in any order
2177    /// # let mut results = vec![];
2178    /// # for _ in 0..3 {
2179    /// #     results.push(stream.next().await.unwrap());
2180    /// # }
2181    /// # results.sort();
2182    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200))]);
2183    /// # }));
2184    /// # }
2185    /// ```
2186    pub fn join_keyed_singleton<V2: Clone>(
2187        self,
2188        keyed_singleton: KeyedSingleton<K, V2, L, Bounded>,
2189    ) -> KeyedStream<K, (V, V2), L, Bounded, NoOrder, R>
2190    where
2191        B: IsBounded,
2192        K: Eq + Hash,
2193    {
2194        keyed_singleton
2195            .join_keyed_stream(self.make_bounded())
2196            .map(q!(|(v2, v)| (v, v2)))
2197    }
2198
2199    /// Gets the values associated with a specific key from the keyed stream.
2200    /// Returns an empty stream if the key is `None` or there are no associated values.
2201    ///
2202    /// # Example
2203    /// ```rust
2204    /// # #[cfg(feature = "deploy")] {
2205    /// # use hydro_lang::prelude::*;
2206    /// # use futures::StreamExt;
2207    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2208    /// let tick = process.tick();
2209    /// let keyed_data = process
2210    ///     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2211    ///     .into_keyed()
2212    ///     .batch(&tick, nondet!(/** test */));
2213    /// let key = tick.singleton(q!(1));
2214    /// keyed_data.get(key).all_ticks()
2215    /// # }, |mut stream| async move {
2216    /// // 10, 11 in any order
2217    /// # let mut results = vec![];
2218    /// # for _ in 0..2 {
2219    /// #     results.push(stream.next().await.unwrap());
2220    /// # }
2221    /// # results.sort();
2222    /// # assert_eq!(results, vec![10, 11]);
2223    /// # }));
2224    /// # }
2225    /// ```
2226    pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Stream<V, L, Bounded, NoOrder, R>
2227    where
2228        B: IsBounded,
2229        K: Eq + Hash,
2230    {
2231        self.make_bounded()
2232            .entries()
2233            .join(key.into().into_stream().map(q!(|k| (k, ()))))
2234            .map(q!(|(_, (v, _))| v))
2235    }
2236
2237    /// For each value in `self`, find the matching key in `lookup`.
2238    /// The output is a keyed stream with the key from `self`, and a value
2239    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2240    /// If the key is not present in `lookup`, the option will be [`None`].
2241    ///
2242    /// # Example
2243    /// ```rust
2244    /// # #[cfg(feature = "deploy")] {
2245    /// # use hydro_lang::prelude::*;
2246    /// # use futures::StreamExt;
2247    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2248    /// # let tick = process.tick();
2249    /// let requests = // { 1: [10, 11], 2: 20 }
2250    /// # process
2251    /// #     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2252    /// #     .into_keyed()
2253    /// #     .batch(&tick, nondet!(/** test */));
2254    /// let other_data = // { 10: 100, 11: 110 }
2255    /// # process
2256    /// #     .source_iter(q!(vec![(10, 100), (11, 110)]))
2257    /// #     .into_keyed()
2258    /// #     .batch(&tick, nondet!(/** test */))
2259    /// #     .first();
2260    /// requests.lookup_keyed_singleton(other_data)
2261    /// # .entries().all_ticks()
2262    /// # }, |mut stream| async move {
2263    /// // { 1: [(10, Some(100)), (11, Some(110))], 2: (20, None) }
2264    /// # let mut results = vec![];
2265    /// # for _ in 0..3 {
2266    /// #     results.push(stream.next().await.unwrap());
2267    /// # }
2268    /// # results.sort();
2269    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (11, Some(110))), (2, (20, None))]);
2270    /// # }));
2271    /// # }
2272    /// ```
2273    pub fn lookup_keyed_singleton<V2>(
2274        self,
2275        lookup: KeyedSingleton<V, V2, L, Bounded>,
2276    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
2277    where
2278        B: IsBounded,
2279        K: Eq + Hash + Clone,
2280        V: Eq + Hash + Clone,
2281        V2: Clone,
2282    {
2283        self.lookup_keyed_stream(
2284            lookup
2285                .into_keyed_stream()
2286                .assume_retries::<R>(nondet!(/** Retries are irrelevant for keyed singletons */)),
2287        )
2288    }
2289
2290    /// For each value in `self`, find the matching key in `lookup`.
2291    /// The output is a keyed stream with the key from `self`, and a value
2292    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2293    /// If the key is not present in `lookup`, the option will be [`None`].
2294    ///
2295    /// # Example
2296    /// ```rust
2297    /// # #[cfg(feature = "deploy")] {
2298    /// # use hydro_lang::prelude::*;
2299    /// # use futures::StreamExt;
2300    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2301    /// # let tick = process.tick();
2302    /// let requests = // { 1: [10, 11], 2: 20 }
2303    /// # process
2304    /// #     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2305    /// #     .into_keyed()
2306    /// #     .batch(&tick, nondet!(/** test */));
2307    /// let other_data = // { 10: [100, 101], 11: 110 }
2308    /// # process
2309    /// #     .source_iter(q!(vec![(10, 100), (10, 101), (11, 110)]))
2310    /// #     .into_keyed()
2311    /// #     .batch(&tick, nondet!(/** test */));
2312    /// requests.lookup_keyed_stream(other_data)
2313    /// # .entries().all_ticks()
2314    /// # }, |mut stream| async move {
2315    /// // { 1: [(10, Some(100)), (10, Some(101)), (11, Some(110))], 2: (20, None) }
2316    /// # let mut results = vec![];
2317    /// # for _ in 0..4 {
2318    /// #     results.push(stream.next().await.unwrap());
2319    /// # }
2320    /// # results.sort();
2321    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(101))), (1, (11, Some(110))), (2, (20, None))]);
2322    /// # }));
2323    /// # }
2324    /// ```
2325    #[expect(clippy::type_complexity, reason = "retries propagation")]
2326    pub fn lookup_keyed_stream<V2, O2: Ordering, R2: Retries>(
2327        self,
2328        lookup: KeyedStream<V, V2, L, Bounded, O2, R2>,
2329    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, <R as MinRetries<R2>>::Min>
2330    where
2331        B: IsBounded,
2332        K: Eq + Hash + Clone,
2333        V: Eq + Hash + Clone,
2334        V2: Clone,
2335        R: MinRetries<R2>,
2336    {
2337        let inverted = self
2338            .make_bounded()
2339            .entries()
2340            .map(q!(|(key, lookup_value)| (lookup_value, key)))
2341            .into_keyed();
2342        let found = inverted
2343            .clone()
2344            .join_keyed_stream(lookup.clone())
2345            .entries()
2346            .map(q!(|(lookup_value, (key, value))| (
2347                key,
2348                (lookup_value, Some(value))
2349            )))
2350            .into_keyed();
2351        let not_found = inverted
2352            .filter_key_not_in(lookup.keys())
2353            .entries()
2354            .map(q!(|(lookup_value, key)| (key, (lookup_value, None))))
2355            .into_keyed();
2356
2357        found.chain(not_found.weaken_retries::<<R as MinRetries<R2>>::Min>())
2358    }
2359
2360    /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
2361    /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
2362    ///
2363    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2364    /// processed before an acknowledgement is emitted.
2365    pub fn atomic(self) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
2366        let id = self.location.flow_state().borrow_mut().next_clock_id();
2367        let out_location = Atomic {
2368            tick: Tick {
2369                id,
2370                l: self.location.clone(),
2371            },
2372        };
2373        KeyedStream::new(
2374            out_location.clone(),
2375            HydroNode::BeginAtomic {
2376                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2377                metadata: out_location
2378                    .new_node_metadata(KeyedStream::<K, V, Atomic<L>, B, O, R>::collection_kind()),
2379            },
2380        )
2381    }
2382
2383    /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
2384    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2385    /// the order of the input.
2386    ///
2387    /// # Non-Determinism
2388    /// The batch boundaries are non-deterministic and may change across executions.
2389    pub fn batch(
2390        self,
2391        tick: &Tick<L>,
2392        nondet: NonDet,
2393    ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2394        let _ = nondet;
2395        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2396        KeyedStream::new(
2397            tick.clone(),
2398            HydroNode::Batch {
2399                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2400                metadata: tick.new_node_metadata(
2401                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
2402                ),
2403            },
2404        )
2405    }
2406}
2407
2408impl<'a, K1, K2, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2409    KeyedStream<(K1, K2), V, L, B, O, R>
2410{
2411    /// Produces a new keyed stream by dropping the first element of the compound key.
2412    ///
2413    /// Because multiple keys may share the same suffix, this operation results in re-grouping
2414    /// of the values under the new keys. The values across groups with the same new key
2415    /// will be interleaved, so the resulting stream has [`NoOrder`] within each group.
2416    ///
2417    /// # Example
2418    /// ```rust
2419    /// # #[cfg(feature = "deploy")] {
2420    /// # use hydro_lang::prelude::*;
2421    /// # use futures::StreamExt;
2422    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2423    /// process
2424    ///     .source_iter(q!(vec![((1, 10), 2), ((1, 10), 3), ((2, 20), 4)]))
2425    ///     .into_keyed()
2426    ///     .drop_key_prefix()
2427    /// #   .entries()
2428    /// # }, |mut stream| async move {
2429    /// // { 10: [2, 3], 20: [4] }
2430    /// # let mut results = Vec::new();
2431    /// # for _ in 0..3 {
2432    /// #     results.push(stream.next().await.unwrap());
2433    /// # }
2434    /// # results.sort();
2435    /// # assert_eq!(results, vec![(10, 2), (10, 3), (20, 4)]);
2436    /// # }));
2437    /// # }
2438    /// ```
2439    pub fn drop_key_prefix(self) -> KeyedStream<K2, V, L, B, NoOrder, R> {
2440        self.entries()
2441            .map(q!(|((_k1, k2), v)| (k2, v)))
2442            .into_keyed()
2443    }
2444}
2445
2446impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries>
2447    KeyedStream<K, V, L, Unbounded, O, R>
2448{
2449    /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
2450    /// of any overlapping groups. The result has [`NoOrder`] on each group because the
2451    /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
2452    /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
2453    ///
2454    /// Currently, both input streams must be [`Unbounded`].
2455    ///
2456    /// # Example
2457    /// ```rust
2458    /// # #[cfg(feature = "deploy")] {
2459    /// # use hydro_lang::prelude::*;
2460    /// # use futures::StreamExt;
2461    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2462    /// let numbers1: KeyedStream<i32, i32, _> = // { 1: [2], 3: [4] }
2463    /// # process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed().into();
2464    /// let numbers2: KeyedStream<i32, i32, _> = // { 1: [3], 3: [5] }
2465    /// # process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed().into();
2466    /// numbers1.merge_unordered(numbers2)
2467    /// #   .entries()
2468    /// # }, |mut stream| async move {
2469    /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
2470    /// # let mut results = Vec::new();
2471    /// # for _ in 0..4 {
2472    /// #     results.push(stream.next().await.unwrap());
2473    /// # }
2474    /// # results.sort();
2475    /// # assert_eq!(results, vec![(1, 2), (1, 3), (3, 4), (3, 5)]);
2476    /// # }));
2477    /// # }
2478    /// ```
2479    pub fn merge_unordered<O2: Ordering, R2: Retries>(
2480        self,
2481        other: KeyedStream<K, V, L, Unbounded, O2, R2>,
2482    ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2483    where
2484        R: MinRetries<R2>,
2485    {
2486        KeyedStream::new(
2487            self.location.clone(),
2488            HydroNode::Chain {
2489                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2490                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2491                metadata: self.location.new_node_metadata(KeyedStream::<
2492                    K,
2493                    V,
2494                    L,
2495                    Unbounded,
2496                    NoOrder,
2497                    <R as MinRetries<R2>>::Min,
2498                >::collection_kind()),
2499            },
2500        )
2501    }
2502
2503    /// Deprecated: use [`KeyedStream::merge_unordered`] instead.
2504    #[deprecated(note = "use `merge_unordered` instead")]
2505    pub fn interleave<O2: Ordering, R2: Retries>(
2506        self,
2507        other: KeyedStream<K, V, L, Unbounded, O2, R2>,
2508    ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2509    where
2510        R: MinRetries<R2>,
2511    {
2512        self.merge_unordered(other)
2513    }
2514}
2515
2516impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
2517where
2518    L: Location<'a> + NoTick,
2519{
2520    /// Returns a keyed stream corresponding to the latest batch of elements being atomically
2521    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2522    /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
2523    /// used to create the atomic section.
2524    ///
2525    /// # Non-Determinism
2526    /// The batch boundaries are non-deterministic and may change across executions.
2527    pub fn batch_atomic(
2528        self,
2529        tick: &Tick<L>,
2530        nondet: NonDet,
2531    ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2532        let _ = nondet;
2533        KeyedStream::new(
2534            tick.clone(),
2535            HydroNode::Batch {
2536                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2537                metadata: tick.new_node_metadata(
2538                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
2539                ),
2540            },
2541        )
2542    }
2543
2544    /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
2545    /// See [`KeyedStream::atomic`] for more details.
2546    pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
2547        KeyedStream::new(
2548            self.location.tick.l.clone(),
2549            HydroNode::EndAtomic {
2550                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2551                metadata: self
2552                    .location
2553                    .tick
2554                    .l
2555                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2556            },
2557        )
2558    }
2559}
2560
2561impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
2562where
2563    L: Location<'a>,
2564{
2565    /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2566    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2567    /// each key.
2568    pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
2569        KeyedStream::new(
2570            self.location.outer().clone(),
2571            HydroNode::YieldConcat {
2572                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2573                metadata: self.location.outer().new_node_metadata(KeyedStream::<
2574                    K,
2575                    V,
2576                    L,
2577                    Unbounded,
2578                    O,
2579                    R,
2580                >::collection_kind(
2581                )),
2582            },
2583        )
2584    }
2585
2586    /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2587    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2588    /// each key.
2589    ///
2590    /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
2591    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2592    /// stream's [`Tick`] context.
2593    pub fn all_ticks_atomic(self) -> KeyedStream<K, V, Atomic<L>, Unbounded, O, R> {
2594        let out_location = Atomic {
2595            tick: self.location.clone(),
2596        };
2597
2598        KeyedStream::new(
2599            out_location.clone(),
2600            HydroNode::YieldConcat {
2601                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2602                metadata: out_location.new_node_metadata(KeyedStream::<
2603                    K,
2604                    V,
2605                    Atomic<L>,
2606                    Unbounded,
2607                    O,
2608                    R,
2609                >::collection_kind()),
2610            },
2611        )
2612    }
2613
2614    /// Transforms the keyed stream using the given closure in "stateful" mode, where stateful operators
2615    /// such as `fold` retrain their memory for each key across ticks rather than resetting across batches of each key.
2616    ///
2617    /// This API is particularly useful for stateful computation on batches of data, such as
2618    /// maintaining an accumulated state that is up to date with the current batch.
2619    ///
2620    /// # Example
2621    /// ```rust
2622    /// # #[cfg(feature = "deploy")] {
2623    /// # use hydro_lang::prelude::*;
2624    /// # use futures::StreamExt;
2625    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2626    /// let tick = process.tick();
2627    /// # // ticks are lazy by default, forces the second tick to run
2628    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2629    /// # let batch_first_tick = process
2630    /// #   .source_iter(q!(vec![(0, 1), (1, 2), (2, 3), (3, 4)]))
2631    /// #   .into_keyed()
2632    /// #   .batch(&tick, nondet!(/** test */));
2633    /// # let batch_second_tick = process
2634    /// #   .source_iter(q!(vec![(0, 5), (1, 6), (2, 7)]))
2635    /// #   .into_keyed()
2636    /// #   .batch(&tick, nondet!(/** test */))
2637    /// #   .defer_tick(); // appears on the second tick
2638    /// let input = batch_first_tick.chain(batch_second_tick).all_ticks();
2639    ///
2640    /// input.batch(&tick, nondet!(/** test */))
2641    ///     .across_ticks(|s| s.reduce(q!(|sum, new| {
2642    ///         *sum += new;
2643    ///     }))).entries().all_ticks()
2644    /// # }, |mut stream| async move {
2645    /// // First tick: [(0, 1), (1, 2), (2, 3), (3, 4)]
2646    /// # let mut results = Vec::new();
2647    /// # for _ in 0..4 {
2648    /// #     results.push(stream.next().await.unwrap());
2649    /// # }
2650    /// # results.sort();
2651    /// # assert_eq!(results, vec![(0, 1), (1, 2), (2, 3), (3, 4)]);
2652    /// // Second tick: [(0, 6), (1, 8), (2, 10), (3, 4)]
2653    /// # results.clear();
2654    /// # for _ in 0..4 {
2655    /// #     results.push(stream.next().await.unwrap());
2656    /// # }
2657    /// # results.sort();
2658    /// # assert_eq!(results, vec![(0, 6), (1, 8), (2, 10), (3, 4)]);
2659    /// # }));
2660    /// # }
2661    /// ```
2662    pub fn across_ticks<Out: BatchAtomic>(
2663        self,
2664        thunk: impl FnOnce(KeyedStream<K, V, Atomic<L>, Unbounded, O, R>) -> Out,
2665    ) -> Out::Batched {
2666        thunk(self.all_ticks_atomic()).batched_atomic()
2667    }
2668
2669    /// Shifts the entries in `self` to the **next tick**, so that the returned keyed stream at
2670    /// tick `T` always has the entries of `self` at tick `T - 1`.
2671    ///
2672    /// At tick `0`, the output keyed stream is empty, since there is no previous tick.
2673    ///
2674    /// This operator enables stateful iterative processing with ticks, by sending data from one
2675    /// tick to the next. For example, you can use it to combine inputs across consecutive batches.
2676    ///
2677    /// # Example
2678    /// ```rust
2679    /// # #[cfg(feature = "deploy")] {
2680    /// # use hydro_lang::prelude::*;
2681    /// # use futures::StreamExt;
2682    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2683    /// let tick = process.tick();
2684    /// # // ticks are lazy by default, forces the second tick to run
2685    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2686    /// # let batch_first_tick = process
2687    /// #   .source_iter(q!(vec![(1, 2), (1, 3)]))
2688    /// #   .batch(&tick, nondet!(/** test */))
2689    /// #   .into_keyed();
2690    /// # let batch_second_tick = process
2691    /// #   .source_iter(q!(vec![(1, 4), (2, 5)]))
2692    /// #   .batch(&tick, nondet!(/** test */))
2693    /// #   .defer_tick()
2694    /// #   .into_keyed(); // appears on the second tick
2695    /// let changes_across_ticks = // { 1: [2, 3] } (first tick), { 1: [4], 2: [5] } (second tick)
2696    /// # batch_first_tick.chain(batch_second_tick);
2697    /// changes_across_ticks.clone().defer_tick().chain( // from the previous tick
2698    ///     changes_across_ticks // from the current tick
2699    /// )
2700    /// # .entries().all_ticks()
2701    /// # }, |mut stream| async move {
2702    /// // First tick: { 1: [2, 3] }
2703    /// # let mut results = Vec::new();
2704    /// # for _ in 0..2 {
2705    /// #     results.push(stream.next().await.unwrap());
2706    /// # }
2707    /// # results.sort();
2708    /// # assert_eq!(results, vec![(1, 2), (1, 3)]);
2709    /// // Second tick: { 1: [2, 3, 4], 2: [5] }
2710    /// # results.clear();
2711    /// # for _ in 0..4 {
2712    /// #     results.push(stream.next().await.unwrap());
2713    /// # }
2714    /// # results.sort();
2715    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5)]);
2716    /// // Third tick: { 1: [4], 2: [5] }
2717    /// # results.clear();
2718    /// # for _ in 0..2 {
2719    /// #     results.push(stream.next().await.unwrap());
2720    /// # }
2721    /// # results.sort();
2722    /// # assert_eq!(results, vec![(1, 4), (2, 5)]);
2723    /// # }));
2724    /// # }
2725    /// ```
2726    pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2727        KeyedStream::new(
2728            self.location.clone(),
2729            HydroNode::DeferTick {
2730                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2731                metadata: self.location.new_node_metadata(KeyedStream::<
2732                    K,
2733                    V,
2734                    Tick<L>,
2735                    Bounded,
2736                    O,
2737                    R,
2738                >::collection_kind()),
2739            },
2740        )
2741    }
2742}
2743
2744#[cfg(test)]
2745mod tests {
2746    #[cfg(feature = "deploy")]
2747    use futures::{SinkExt, StreamExt};
2748    #[cfg(feature = "deploy")]
2749    use hydro_deploy::Deployment;
2750    #[cfg(any(feature = "deploy", feature = "sim"))]
2751    use stageleft::q;
2752
2753    #[cfg(any(feature = "deploy", feature = "sim"))]
2754    use crate::compile::builder::FlowBuilder;
2755    #[cfg(feature = "deploy")]
2756    use crate::live_collections::stream::ExactlyOnce;
2757    #[cfg(feature = "sim")]
2758    use crate::live_collections::stream::{NoOrder, TotalOrder};
2759    #[cfg(any(feature = "deploy", feature = "sim"))]
2760    use crate::location::Location;
2761    #[cfg(feature = "sim")]
2762    use crate::networking::TCP;
2763    #[cfg(any(feature = "deploy", feature = "sim"))]
2764    use crate::nondet::nondet;
2765    #[cfg(feature = "deploy")]
2766    use crate::properties::manual_proof;
2767
2768    #[cfg(feature = "deploy")]
2769    #[tokio::test]
2770    async fn reduce_watermark_filter() {
2771        let mut deployment = Deployment::new();
2772
2773        let mut flow = FlowBuilder::new();
2774        let node = flow.process::<()>();
2775        let external = flow.external::<()>();
2776
2777        let node_tick = node.tick();
2778        let watermark = node_tick.singleton(q!(2));
2779
2780        let sum = node
2781            .source_stream(q!(tokio_stream::iter([
2782                (0, 100),
2783                (1, 101),
2784                (2, 102),
2785                (2, 102)
2786            ])))
2787            .into_keyed()
2788            .reduce_watermark(
2789                watermark,
2790                q!(|acc, v| {
2791                    *acc += v;
2792                }),
2793            )
2794            .snapshot(&node_tick, nondet!(/** test */))
2795            .entries()
2796            .all_ticks()
2797            .send_bincode_external(&external);
2798
2799        let nodes = flow
2800            .with_process(&node, deployment.Localhost())
2801            .with_external(&external, deployment.Localhost())
2802            .deploy(&mut deployment);
2803
2804        deployment.deploy().await.unwrap();
2805
2806        let mut out = nodes.connect(sum).await;
2807
2808        deployment.start().await.unwrap();
2809
2810        assert_eq!(out.next().await.unwrap(), (2, 204));
2811    }
2812
2813    #[cfg(feature = "deploy")]
2814    #[tokio::test]
2815    async fn reduce_watermark_bounded() {
2816        let mut deployment = Deployment::new();
2817
2818        let mut flow = FlowBuilder::new();
2819        let node = flow.process::<()>();
2820        let external = flow.external::<()>();
2821
2822        let node_tick = node.tick();
2823        let watermark = node_tick.singleton(q!(2));
2824
2825        let sum = node
2826            .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2827            .into_keyed()
2828            .reduce_watermark(
2829                watermark,
2830                q!(|acc, v| {
2831                    *acc += v;
2832                }),
2833            )
2834            .entries()
2835            .send_bincode_external(&external);
2836
2837        let nodes = flow
2838            .with_process(&node, deployment.Localhost())
2839            .with_external(&external, deployment.Localhost())
2840            .deploy(&mut deployment);
2841
2842        deployment.deploy().await.unwrap();
2843
2844        let mut out = nodes.connect(sum).await;
2845
2846        deployment.start().await.unwrap();
2847
2848        assert_eq!(out.next().await.unwrap(), (2, 204));
2849    }
2850
2851    #[cfg(feature = "deploy")]
2852    #[tokio::test]
2853    async fn reduce_watermark_garbage_collect() {
2854        let mut deployment = Deployment::new();
2855
2856        let mut flow = FlowBuilder::new();
2857        let node = flow.process::<()>();
2858        let external = flow.external::<()>();
2859        let (tick_send, tick_trigger) =
2860            node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
2861
2862        let node_tick = node.tick();
2863        let (watermark_complete_cycle, watermark) =
2864            node_tick.cycle_with_initial(node_tick.singleton(q!(2)));
2865        let next_watermark = watermark.clone().map(q!(|v| v + 1));
2866        watermark_complete_cycle.complete_next_tick(next_watermark);
2867
2868        let tick_triggered_input = node_tick
2869            .singleton(q!((3, 103)))
2870            .into_stream()
2871            .filter_if(
2872                tick_trigger
2873                    .clone()
2874                    .batch(&node_tick, nondet!(/** test */))
2875                    .first()
2876                    .is_some(),
2877            )
2878            .all_ticks();
2879
2880        let sum = node
2881            .source_stream(q!(tokio_stream::iter([
2882                (0, 100),
2883                (1, 101),
2884                (2, 102),
2885                (2, 102)
2886            ])))
2887            .merge_unordered(tick_triggered_input)
2888            .into_keyed()
2889            .reduce_watermark(
2890                watermark,
2891                q!(
2892                    |acc, v| {
2893                        *acc += v;
2894                    },
2895                    commutative = manual_proof!(/** integer addition is commutative */)
2896                ),
2897            )
2898            .snapshot(&node_tick, nondet!(/** test */))
2899            .entries()
2900            .all_ticks()
2901            .send_bincode_external(&external);
2902
2903        let nodes = flow
2904            .with_default_optimize()
2905            .with_process(&node, deployment.Localhost())
2906            .with_external(&external, deployment.Localhost())
2907            .deploy(&mut deployment);
2908
2909        deployment.deploy().await.unwrap();
2910
2911        let mut tick_send = nodes.connect(tick_send).await;
2912        let mut out_recv = nodes.connect(sum).await;
2913
2914        deployment.start().await.unwrap();
2915
2916        assert_eq!(out_recv.next().await.unwrap(), (2, 204));
2917
2918        tick_send.send(()).await.unwrap();
2919
2920        assert_eq!(out_recv.next().await.unwrap(), (3, 103));
2921    }
2922
2923    #[cfg(feature = "sim")]
2924    #[test]
2925    #[should_panic]
2926    fn sim_batch_nondet_size() {
2927        let mut flow = FlowBuilder::new();
2928        let node = flow.process::<()>();
2929
2930        let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2931
2932        let tick = node.tick();
2933        let out_recv = input
2934            .batch(&tick, nondet!(/** test */))
2935            .fold(q!(|| vec![]), q!(|acc, v| acc.push(v)))
2936            .entries()
2937            .all_ticks()
2938            .sim_output();
2939
2940        flow.sim().exhaustive(async || {
2941            out_recv
2942                .assert_yields_only_unordered([(1, vec![1, 2])])
2943                .await;
2944        });
2945    }
2946
2947    #[cfg(feature = "sim")]
2948    #[test]
2949    fn sim_batch_preserves_group_order() {
2950        let mut flow = FlowBuilder::new();
2951        let node = flow.process::<()>();
2952
2953        let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2954
2955        let tick = node.tick();
2956        let out_recv = input
2957            .batch(&tick, nondet!(/** test */))
2958            .all_ticks()
2959            .fold_early_stop(
2960                q!(|| 0),
2961                q!(|acc, v| {
2962                    *acc = std::cmp::max(v, *acc);
2963                    *acc >= 2
2964                }),
2965            )
2966            .entries()
2967            .sim_output();
2968
2969        let instances = flow.sim().exhaustive(async || {
2970            out_recv
2971                .assert_yields_only_unordered([(1, 2), (2, 3)])
2972                .await;
2973        });
2974
2975        assert_eq!(instances, 8);
2976        // - three cases: all three in a separate tick (pick where (2, 3) is)
2977        // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after
2978        // - two cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them
2979        // - one case: all three together
2980    }
2981
2982    #[cfg(feature = "sim")]
2983    #[test]
2984    fn sim_batch_unordered_shuffles() {
2985        let mut flow = FlowBuilder::new();
2986        let node = flow.process::<()>();
2987
2988        let input = node
2989            .source_iter(q!([(1, 1), (1, 2), (2, 3)]))
2990            .into_keyed()
2991            .weaken_ordering::<NoOrder>();
2992
2993        let tick = node.tick();
2994        let out_recv = input
2995            .batch(&tick, nondet!(/** test */))
2996            .all_ticks()
2997            .entries()
2998            .sim_output();
2999
3000        let instances = flow.sim().exhaustive(async || {
3001            out_recv
3002                .assert_yields_only_unordered([(1, 1), (1, 2), (2, 3)])
3003                .await;
3004        });
3005
3006        assert_eq!(instances, 13);
3007        // - 6 (3 * 2) cases: all three in a separate tick (pick where (2, 3) is), and order of (1, 1), (1, 2)
3008        // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
3009        // - 4 (2 * 2) cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them, and order of (1, 1), (1, 2)
3010        // - one case: all three together (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
3011    }
3012
3013    #[cfg(feature = "sim")]
3014    #[test]
3015    #[should_panic]
3016    fn sim_observe_order_batched() {
3017        let mut flow = FlowBuilder::new();
3018        let node = flow.process::<()>();
3019
3020        let (port, input) = node.sim_input::<_, NoOrder, _>();
3021
3022        let tick = node.tick();
3023        let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
3024        let out_recv = batch
3025            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3026            .all_ticks()
3027            .first()
3028            .entries()
3029            .sim_output();
3030
3031        flow.sim().exhaustive(async || {
3032            port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
3033            out_recv
3034                .assert_yields_only_unordered([(1, 1), (2, 1)])
3035                .await; // fails with assume_ordering
3036        });
3037    }
3038
3039    #[cfg(feature = "sim")]
3040    #[test]
3041    fn sim_observe_order_batched_count() {
3042        let mut flow = FlowBuilder::new();
3043        let node = flow.process::<()>();
3044
3045        let (port, input) = node.sim_input::<_, NoOrder, _>();
3046
3047        let tick = node.tick();
3048        let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
3049        let out_recv = batch
3050            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3051            .all_ticks()
3052            .entries()
3053            .sim_output();
3054
3055        let instance_count = flow.sim().exhaustive(async || {
3056            port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
3057            let _ = out_recv.collect_sorted::<Vec<_>>().await;
3058        });
3059
3060        assert_eq!(instance_count, 104); // too complicated to enumerate here, but less than stream equivalent
3061    }
3062
3063    #[cfg(feature = "sim")]
3064    #[test]
3065    fn sim_top_level_assume_ordering() {
3066        use std::collections::HashMap;
3067
3068        let mut flow = FlowBuilder::new();
3069        let node = flow.process::<()>();
3070
3071        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3072
3073        let out_recv = input
3074            .into_keyed()
3075            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3076            .fold_early_stop(
3077                q!(|| Vec::new()),
3078                q!(|acc, v| {
3079                    acc.push(v);
3080                    acc.len() >= 2
3081                }),
3082            )
3083            .entries()
3084            .sim_output();
3085
3086        let instance_count = flow.sim().exhaustive(async || {
3087            in_send.send_many_unordered([(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd')]);
3088            let out: HashMap<_, _> = out_recv
3089                .collect_sorted::<Vec<_>>()
3090                .await
3091                .into_iter()
3092                .collect();
3093            // Each key accumulates its values; we get one entry per key
3094            assert_eq!(out.len(), 2);
3095        });
3096
3097        assert_eq!(instance_count, 24)
3098    }
3099
3100    #[cfg(feature = "sim")]
3101    #[test]
3102    fn sim_top_level_assume_ordering_cycle_back() {
3103        use std::collections::HashMap;
3104
3105        let mut flow = FlowBuilder::new();
3106        let node = flow.process::<()>();
3107        let node2 = flow.process::<()>();
3108
3109        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3110
3111        let (complete_cycle_back, cycle_back) =
3112            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3113        let ordered = input
3114            .into_keyed()
3115            .merge_unordered(cycle_back)
3116            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3117        complete_cycle_back.complete(
3118            ordered
3119                .clone()
3120                .map(q!(|v| v + 1))
3121                .filter(q!(|v| v % 2 == 1))
3122                .entries()
3123                .send(&node2, TCP.fail_stop().bincode())
3124                .send(&node, TCP.fail_stop().bincode())
3125                .into_keyed(),
3126        );
3127
3128        let out_recv = ordered
3129            .fold_early_stop(
3130                q!(|| Vec::new()),
3131                q!(|acc, v| {
3132                    acc.push(v);
3133                    acc.len() >= 2
3134                }),
3135            )
3136            .entries()
3137            .sim_output();
3138
3139        let mut saw = false;
3140        let instance_count = flow.sim().exhaustive(async || {
3141            // Send (1, 0) and (1, 2). 0+1=1 is odd so cycles back.
3142            // We want to see [0, 1] - the cycled back value interleaved
3143            in_send.send_many_unordered([(1, 0), (1, 2)]);
3144            let out: HashMap<_, _> = out_recv
3145                .collect_sorted::<Vec<_>>()
3146                .await
3147                .into_iter()
3148                .collect();
3149
3150            // We want to see an instance where key 1 gets: 0, then 1 (cycled back from 0+1)
3151            if let Some(values) = out.get(&1)
3152                && *values == vec![0, 1]
3153            {
3154                saw = true;
3155            }
3156        });
3157
3158        assert!(
3159            saw,
3160            "did not see an instance with key 1 having [0, 1] in order"
3161        );
3162        assert_eq!(instance_count, 6);
3163    }
3164
3165    #[cfg(feature = "sim")]
3166    #[test]
3167    fn sim_top_level_assume_ordering_cross_key_cycle() {
3168        use std::collections::HashMap;
3169
3170        // This test demonstrates why releasing one entry at a time is important:
3171        // When one key's observed order cycles back into a different key, we need
3172        // to be able to interleave the cycled-back entry with pending items for
3173        // that other key.
3174        let mut flow = FlowBuilder::new();
3175        let node = flow.process::<()>();
3176        let node2 = flow.process::<()>();
3177
3178        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3179
3180        let (complete_cycle_back, cycle_back) =
3181            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3182        let ordered = input
3183            .into_keyed()
3184            .merge_unordered(cycle_back)
3185            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3186
3187        // Cycle back: when we see (1, 10), emit (2, 100) to key 2
3188        complete_cycle_back.complete(
3189            ordered
3190                .clone()
3191                .filter(q!(|v| *v == 10))
3192                .map(q!(|_| 100))
3193                .entries()
3194                .map(q!(|(_, v)| (2, v))) // Change key from 1 to 2
3195                .send(&node2, TCP.fail_stop().bincode())
3196                .send(&node, TCP.fail_stop().bincode())
3197                .into_keyed(),
3198        );
3199
3200        let out_recv = ordered
3201            .fold_early_stop(
3202                q!(|| Vec::new()),
3203                q!(|acc, v| {
3204                    acc.push(v);
3205                    acc.len() >= 2
3206                }),
3207            )
3208            .entries()
3209            .sim_output();
3210
3211        // We want to see an instance where:
3212        // - (1, 10) is released first
3213        // - This causes (2, 100) to be cycled back
3214        // - (2, 100) is released BEFORE (2, 20) which was already pending
3215        let mut saw_cross_key_interleave = false;
3216        let instance_count = flow.sim().exhaustive(async || {
3217            // Send (1, 10), (1, 11) for key 1, and (2, 20), (2, 21) for key 2
3218            in_send.send_many_unordered([(1, 10), (1, 11), (2, 20), (2, 21)]);
3219            let out: HashMap<_, _> = out_recv
3220                .collect_sorted::<Vec<_>>()
3221                .await
3222                .into_iter()
3223                .collect();
3224
3225            // Check if we see the cross-key interleaving:
3226            // key 2 should have [100, 20] or [100, 21] - cycled back 100 before a pending item
3227            if let Some(values) = out.get(&2)
3228                && values.len() >= 2
3229                && values[0] == 100
3230            {
3231                saw_cross_key_interleave = true;
3232            }
3233        });
3234
3235        assert!(
3236            saw_cross_key_interleave,
3237            "did not see an instance where cycled-back 100 was released before pending items for key 2"
3238        );
3239        assert_eq!(instance_count, 60);
3240    }
3241
3242    #[cfg(feature = "sim")]
3243    #[test]
3244    fn sim_top_level_assume_ordering_cycle_back_tick() {
3245        use std::collections::HashMap;
3246
3247        let mut flow = FlowBuilder::new();
3248        let node = flow.process::<()>();
3249        let node2 = flow.process::<()>();
3250
3251        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3252
3253        let (complete_cycle_back, cycle_back) =
3254            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3255        let ordered = input
3256            .into_keyed()
3257            .merge_unordered(cycle_back)
3258            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3259        complete_cycle_back.complete(
3260            ordered
3261                .clone()
3262                .batch(&node.tick(), nondet!(/** test */))
3263                .all_ticks()
3264                .map(q!(|v| v + 1))
3265                .filter(q!(|v| v % 2 == 1))
3266                .entries()
3267                .send(&node2, TCP.fail_stop().bincode())
3268                .send(&node, TCP.fail_stop().bincode())
3269                .into_keyed(),
3270        );
3271
3272        let out_recv = ordered
3273            .fold_early_stop(
3274                q!(|| Vec::new()),
3275                q!(|acc, v| {
3276                    acc.push(v);
3277                    acc.len() >= 2
3278                }),
3279            )
3280            .entries()
3281            .sim_output();
3282
3283        let mut saw = false;
3284        let instance_count = flow.sim().exhaustive(async || {
3285            in_send.send_many_unordered([(1, 0), (1, 2)]);
3286            let out: HashMap<_, _> = out_recv
3287                .collect_sorted::<Vec<_>>()
3288                .await
3289                .into_iter()
3290                .collect();
3291
3292            if let Some(values) = out.get(&1)
3293                && *values == vec![0, 1]
3294            {
3295                saw = true;
3296            }
3297        });
3298
3299        assert!(
3300            saw,
3301            "did not see an instance with key 1 having [0, 1] in order"
3302        );
3303        assert_eq!(instance_count, 58);
3304    }
3305}