Skip to main content

hydro_lang/live_collections/keyed_stream/
mod.rs

1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
11
12use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
13use super::keyed_singleton::KeyedSingleton;
14use super::optional::Optional;
15use super::stream::{
16    ExactlyOnce, IsExactlyOnce, IsOrdered, MinOrder, MinRetries, NoOrder, Stream, TotalOrder,
17};
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::keyed_singleton::KeyedSingletonBound;
27use crate::live_collections::stream::{
28    AtLeastOnce, Ordering, Retries, WeakerOrderingThan, WeakerRetryThan,
29};
30#[cfg(stageleft_runtime)]
31use crate::location::dynamic::{DynLocation, LocationId};
32use crate::location::tick::DeferTick;
33use crate::location::{Atomic, Location, Tick, check_matching_location};
34use crate::manual_expr::ManualExpr;
35use crate::nondet::{NonDet, nondet};
36use crate::properties::{
37    AggFuncAlgebra, ApplyMonotoneKeyedStream, ValidCommutativityFor, ValidIdempotenceFor,
38    manual_proof,
39};
40
41pub mod networking;
42
43/// Streaming elements of type `V` grouped by a key of type `K`.
44///
45/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
46/// order of keys is non-deterministic but the order *within* each group may be deterministic.
47///
48/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
49/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
50/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
51///
52/// Type Parameters:
53/// - `K`: the type of the key for each group
54/// - `V`: the type of the elements inside each group
55/// - `Loc`: the [`Location`] where the keyed stream is materialized
56/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
57/// - `Order`: tracks whether the elements within each group have deterministic order
58///   ([`TotalOrder`]) or not ([`NoOrder`])
59/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
60///   ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
61pub struct KeyedStream<
62    K,
63    V,
64    Loc,
65    Bound: Boundedness = Unbounded,
66    Order: Ordering = TotalOrder,
67    Retry: Retries = ExactlyOnce,
68> {
69    pub(crate) location: Loc,
70    pub(crate) ir_node: RefCell<HydroNode>,
71    pub(crate) flow_state: FlowState,
72
73    _phantom: PhantomData<(K, V, Loc, Bound, Order, Retry)>,
74}
75
76impl<K, V, L, B: Boundedness, O: Ordering, R: Retries> Drop for KeyedStream<K, V, L, B, O, R> {
77    fn drop(&mut self) {
78        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
79        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
80            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
81                input: Box::new(ir_node),
82                op_metadata: HydroIrOpMetadata::new(),
83            });
84        }
85    }
86}
87
88impl<'a, K, V, L, O: Ordering, R: Retries> From<KeyedStream<K, V, L, Bounded, O, R>>
89    for KeyedStream<K, V, L, Unbounded, O, R>
90where
91    L: Location<'a>,
92{
93    fn from(stream: KeyedStream<K, V, L, Bounded, O, R>) -> KeyedStream<K, V, L, Unbounded, O, R> {
94        let new_meta = stream
95            .location
96            .new_node_metadata(KeyedStream::<K, V, L, Unbounded, O, R>::collection_kind());
97
98        KeyedStream {
99            location: stream.location.clone(),
100            flow_state: stream.flow_state.clone(),
101            ir_node: RefCell::new(HydroNode::Cast {
102                inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
103                metadata: new_meta,
104            }),
105            _phantom: PhantomData,
106        }
107    }
108}
109
110impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
111    for KeyedStream<K, V, L, B, NoOrder, R>
112where
113    L: Location<'a>,
114{
115    fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
116        stream.weaken_ordering()
117    }
118}
119
120impl<'a, K, V, L, O: Ordering, R: Retries> DeferTick for KeyedStream<K, V, Tick<L>, Bounded, O, R>
121where
122    L: Location<'a>,
123{
124    fn defer_tick(self) -> Self {
125        KeyedStream::defer_tick(self)
126    }
127}
128
129impl<'a, K, V, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
130    for KeyedStream<K, V, Tick<L>, Bounded, O, R>
131where
132    L: Location<'a>,
133{
134    type Location = Tick<L>;
135
136    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
137        KeyedStream {
138            flow_state: location.flow_state().clone(),
139            location: location.clone(),
140            ir_node: RefCell::new(HydroNode::CycleSource {
141                cycle_id,
142                metadata: location.new_node_metadata(
143                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
144                ),
145            }),
146            _phantom: PhantomData,
147        }
148    }
149}
150
151impl<'a, K, V, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
152    for KeyedStream<K, V, Tick<L>, Bounded, O, R>
153where
154    L: Location<'a>,
155{
156    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
157        assert_eq!(
158            Location::id(&self.location),
159            expected_location,
160            "locations do not match"
161        );
162
163        self.location
164            .flow_state()
165            .borrow_mut()
166            .push_root(HydroRoot::CycleSink {
167                cycle_id,
168                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
169                op_metadata: HydroIrOpMetadata::new(),
170            });
171    }
172}
173
174impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
175    for KeyedStream<K, V, L, B, O, R>
176where
177    L: Location<'a>,
178{
179    type Location = L;
180
181    fn create_source(cycle_id: CycleId, location: L) -> Self {
182        KeyedStream {
183            flow_state: location.flow_state().clone(),
184            location: location.clone(),
185            ir_node: RefCell::new(HydroNode::CycleSource {
186                cycle_id,
187                metadata: location
188                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
189            }),
190            _phantom: PhantomData,
191        }
192    }
193}
194
195impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
196    for KeyedStream<K, V, L, B, O, R>
197where
198    L: Location<'a>,
199{
200    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
201        assert_eq!(
202            Location::id(&self.location),
203            expected_location,
204            "locations do not match"
205        );
206        self.location
207            .flow_state()
208            .borrow_mut()
209            .push_root(HydroRoot::CycleSink {
210                cycle_id,
211                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
212                op_metadata: HydroIrOpMetadata::new(),
213            });
214    }
215}
216
217impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
218    Clone for KeyedStream<K, V, Loc, Bound, Order, R>
219{
220    fn clone(&self) -> Self {
221        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
222            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
223            *self.ir_node.borrow_mut() = HydroNode::Tee {
224                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
225                metadata: self.location.new_node_metadata(Self::collection_kind()),
226            };
227        }
228
229        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
230            KeyedStream {
231                location: self.location.clone(),
232                flow_state: self.flow_state.clone(),
233                ir_node: HydroNode::Tee {
234                    inner: SharedNode(inner.0.clone()),
235                    metadata: metadata.clone(),
236                }
237                .into(),
238                _phantom: PhantomData,
239            }
240        } else {
241            unreachable!()
242        }
243    }
244}
245
246/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
247/// control the processing of future elements.
248pub enum Generate<T> {
249    /// Emit the provided element, and keep processing future inputs.
250    Yield(T),
251    /// Emit the provided element as the _final_ element, do not process future inputs.
252    Return(T),
253    /// Do not emit anything, but continue processing future inputs.
254    Continue,
255    /// Do not emit anything, and do not process further inputs.
256    Break,
257}
258
259impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
260    KeyedStream<K, V, L, B, O, R>
261{
262    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
263        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
264        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
265
266        let flow_state = location.flow_state().clone();
267        KeyedStream {
268            location,
269            flow_state,
270            ir_node: RefCell::new(ir_node),
271            _phantom: PhantomData,
272        }
273    }
274
275    /// Returns the [`CollectionKind`] corresponding to this type.
276    pub fn collection_kind() -> CollectionKind {
277        CollectionKind::KeyedStream {
278            bound: B::BOUND_KIND,
279            value_order: O::ORDERING_KIND,
280            value_retry: R::RETRIES_KIND,
281            key_type: stageleft::quote_type::<K>().into(),
282            value_type: stageleft::quote_type::<V>().into(),
283        }
284    }
285
286    /// Returns the [`Location`] where this keyed stream is being materialized.
287    pub fn location(&self) -> &L {
288        &self.location
289    }
290
291    /// Weakens the consistency of this live collection to not guarantee any consistency across
292    /// cluster members (if this collection is on a cluster).
293    pub fn weaken_consistency(self) -> KeyedStream<K, V, L::DropConsistency, B, O, R>
294    where
295        L: Location<'a>,
296    {
297        if L::consistency()
298            .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
299        {
300            // already no consistency
301            KeyedStream::new(
302                self.location.drop_consistency(),
303                self.ir_node.replace(HydroNode::Placeholder),
304            )
305        } else {
306            KeyedStream::new(
307                self.location.drop_consistency(),
308                HydroNode::Cast {
309                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
310                    metadata: self
311                        .location
312                        .drop_consistency()
313                        .new_node_metadata(
314                            KeyedStream::<K, V, L::DropConsistency, B>::collection_kind(),
315                        ),
316                },
317            )
318        }
319    }
320
321    /// Casts this live collection to have the consistency guarantees specified in the given
322    /// location type parameter. The developer must ensure that the strengthened consistency
323    /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
324    pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
325        self,
326        _proof: impl crate::properties::ConsistencyProof,
327    ) -> KeyedStream<K, V, L2, B, O, R>
328    where
329        L: Location<'a>,
330    {
331        if L::consistency() == L2::consistency() {
332            KeyedStream::new(
333                self.location.with_consistency_of(),
334                self.ir_node.replace(HydroNode::Placeholder),
335            )
336        } else {
337            KeyedStream::new(
338                self.location.with_consistency_of(),
339                HydroNode::AssertIsConsistent {
340                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
341                    trusted: false,
342                    metadata: self
343                        .location
344                        .clone()
345                        .with_consistency_of::<L2>()
346                        .new_node_metadata(KeyedStream::<K, V, L2, B, O, R>::collection_kind()),
347                },
348            )
349        }
350    }
351
352    /// Turns this [`KeyedStream`] into a [`Stream`] preserving ordering, under the invariant
353    /// assumption that there is at most one key. If this invariant is broken, the program
354    /// may exhibit undefined behavior, so uses must be carefully vetted.
355    pub(crate) fn cast_at_most_one_key(self) -> Stream<(K, V), L, B, O, R> {
356        Stream::new(
357            self.location.clone(),
358            HydroNode::Cast {
359                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
360                metadata: self
361                    .location
362                    .new_node_metadata(Stream::<(K, V), L, B, O, R>::collection_kind()),
363            },
364        )
365    }
366
367    /// Turns this [`KeyedStream`] into a [`KeyedSingleton`], under the invariant assumption that
368    /// there is at most one entry per key. If this invariant is broken, the program may exhibit
369    /// undefined behavior, so uses must be carefully vetted.
370    pub(crate) fn cast_at_most_one_entry_per_key(
371        self,
372    ) -> KeyedSingleton<K, V, L, B::WithBoundedValue> {
373        KeyedSingleton::new(
374            self.location.clone(),
375            HydroNode::Cast {
376                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
377                metadata: self.location.new_node_metadata(KeyedSingleton::<
378                    K,
379                    V,
380                    L,
381                    B::WithBoundedValue,
382                >::collection_kind()),
383            },
384        )
385    }
386
387    pub(crate) fn use_ordering_type<O2: Ordering>(self) -> KeyedStream<K, V, L, B, O2, R> {
388        if O::ORDERING_KIND == O2::ORDERING_KIND {
389            KeyedStream::new(
390                self.location.clone(),
391                self.ir_node.replace(HydroNode::Placeholder),
392            )
393        } else {
394            panic!(
395                "Runtime ordering {:?} did not match requested cast {:?}.",
396                O::ORDERING_KIND,
397                O2::ORDERING_KIND
398            )
399        }
400    }
401
402    /// Explicitly "casts" the keyed stream to a type with a different ordering
403    /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
404    /// by the type-system.
405    ///
406    /// # Non-Determinism
407    /// This function is used as an escape hatch, and any mistakes in the
408    /// provided ordering guarantee will propagate into the guarantees
409    /// for the rest of the program.
410    pub fn assume_ordering<O2: Ordering>(
411        self,
412        _nondet: NonDet,
413    ) -> KeyedStream<K, V, L::DropConsistency, B, O2, R> {
414        if O::ORDERING_KIND == O2::ORDERING_KIND {
415            self.use_ordering_type().weaken_consistency()
416        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
417            // We can always weaken the ordering guarantee
418            let target_location = self.location.drop_consistency();
419            KeyedStream::new(
420                target_location.clone(),
421                HydroNode::Cast {
422                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
423                    metadata: target_location
424                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
425                },
426            )
427        } else {
428            let target_location = self.location.drop_consistency();
429            KeyedStream::new(
430                target_location.clone(),
431                HydroNode::ObserveNonDet {
432                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
433                    trusted: false,
434                    metadata: target_location
435                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
436                },
437            )
438        }
439    }
440
441    fn assume_ordering_trusted<O2: Ordering>(
442        self,
443        _nondet: NonDet,
444    ) -> KeyedStream<K, V, L, B, O2, R> {
445        if O::ORDERING_KIND == O2::ORDERING_KIND {
446            KeyedStream::new(
447                self.location.clone(),
448                self.ir_node.replace(HydroNode::Placeholder),
449            )
450        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
451            // We can always weaken the ordering guarantee
452            KeyedStream::new(
453                self.location.clone(),
454                HydroNode::Cast {
455                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
456                    metadata: self
457                        .location
458                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
459                },
460            )
461        } else {
462            KeyedStream::new(
463                self.location.clone(),
464                HydroNode::ObserveNonDet {
465                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
466                    trusted: true,
467                    metadata: self
468                        .location
469                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
470                },
471            )
472        }
473    }
474
475    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
476    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
477    /// which is always safe because that is the weakest possible guarantee.
478    pub fn weakest_ordering(self) -> KeyedStream<K, V, L, B, NoOrder, R> {
479        self.weaken_ordering::<NoOrder>()
480    }
481
482    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
483    /// enforcing that `O2` is weaker than the input ordering guarantee.
484    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> KeyedStream<K, V, L, B, O2, R> {
485        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
486        self.assume_ordering_trusted::<O2>(nondet)
487    }
488
489    /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
490    /// implies that `O == TotalOrder`.
491    pub fn make_totally_ordered(self) -> KeyedStream<K, V, L, B, TotalOrder, R>
492    where
493        O: IsOrdered,
494    {
495        self.assume_ordering_trusted(nondet!(/** no-op */))
496    }
497
498    /// Explicitly "casts" the keyed stream to a type with a different retries
499    /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
500    /// be proven by the type-system.
501    ///
502    /// # Non-Determinism
503    /// This function is used as an escape hatch, and any mistakes in the
504    /// provided retries guarantee will propagate into the guarantees
505    /// for the rest of the program.
506    pub fn assume_retries<R2: Retries>(
507        self,
508        _nondet: NonDet,
509    ) -> KeyedStream<K, V, L::DropConsistency, B, O, R2> {
510        if R::RETRIES_KIND == R2::RETRIES_KIND {
511            KeyedStream::new(
512                self.location.drop_consistency(),
513                self.ir_node.replace(HydroNode::Placeholder),
514            )
515        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
516            // We can always weaken the retries guarantee
517            let target_location = self.location.drop_consistency();
518            KeyedStream::new(
519                target_location.clone(),
520                HydroNode::Cast {
521                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
522                    metadata: target_location
523                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
524                },
525            )
526        } else {
527            let target_location = self.location.drop_consistency();
528            KeyedStream::new(
529                target_location.clone(),
530                HydroNode::ObserveNonDet {
531                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
532                    trusted: false,
533                    metadata: target_location
534                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
535                },
536            )
537        }
538    }
539
540    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
541    // is not observable
542    fn assume_retries_trusted<R2: Retries>(
543        self,
544        _nondet: NonDet,
545    ) -> KeyedStream<K, V, L, B, O, R2> {
546        if R::RETRIES_KIND == R2::RETRIES_KIND {
547            KeyedStream::new(
548                self.location.clone(),
549                self.ir_node.replace(HydroNode::Placeholder),
550            )
551        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
552            // We can always weaken the retries guarantee
553            KeyedStream::new(
554                self.location.clone(),
555                HydroNode::Cast {
556                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
557                    metadata: self
558                        .location
559                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
560                },
561            )
562        } else {
563            KeyedStream::new(
564                self.location.clone(),
565                HydroNode::ObserveNonDet {
566                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
567                    trusted: true,
568                    metadata: self
569                        .location
570                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
571                },
572            )
573        }
574    }
575
576    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
577    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
578    /// which is always safe because that is the weakest possible guarantee.
579    pub fn weakest_retries(self) -> KeyedStream<K, V, L, B, O, AtLeastOnce> {
580        self.weaken_retries::<AtLeastOnce>()
581    }
582
583    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
584    /// enforcing that `R2` is weaker than the input retries guarantee.
585    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> KeyedStream<K, V, L, B, O, R2> {
586        let nondet = nondet!(/** this is a weaker retries guarantee, so it is safe to assume */);
587        self.assume_retries_trusted::<R2>(nondet)
588    }
589
590    /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
591    /// implies that `R == ExactlyOnce`.
592    pub fn make_exactly_once(self) -> KeyedStream<K, V, L, B, O, ExactlyOnce>
593    where
594        R: IsExactlyOnce,
595    {
596        self.assume_retries_trusted(nondet!(/** no-op */))
597    }
598
599    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
600    /// implies that `B == Bounded`.
601    pub fn make_bounded(self) -> KeyedStream<K, V, L, Bounded, O, R>
602    where
603        B: IsBounded,
604    {
605        self.weaken_boundedness()
606    }
607
608    /// Weakens the boundedness guarantee to an arbitrary boundedness `B2`, given that `B: IsBounded`,
609    /// which implies that `B == Bounded`.
610    pub fn weaken_boundedness<B2: Boundedness>(self) -> KeyedStream<K, V, L, B2, O, R> {
611        if B::BOUNDED == B2::BOUNDED {
612            KeyedStream::new(
613                self.location.clone(),
614                self.ir_node.replace(HydroNode::Placeholder),
615            )
616        } else {
617            // We can always weaken the boundedness
618            KeyedStream::new(
619                self.location.clone(),
620                HydroNode::Cast {
621                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
622                    metadata: self
623                        .location
624                        .new_node_metadata(KeyedStream::<K, V, L, B2, O, R>::collection_kind()),
625                },
626            )
627        }
628    }
629
630    /// Flattens the keyed stream into an unordered stream of key-value pairs.
631    ///
632    /// # Example
633    /// ```rust
634    /// # #[cfg(feature = "deploy")] {
635    /// # use hydro_lang::prelude::*;
636    /// # use futures::StreamExt;
637    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
638    /// process
639    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
640    ///     .into_keyed()
641    ///     .entries()
642    /// # }, |mut stream| async move {
643    /// // (1, 2), (1, 3), (2, 4) in any order
644    /// # let mut results = Vec::new();
645    /// # for _ in 0..3 {
646    /// #     results.push(stream.next().await.unwrap());
647    /// # }
648    /// # results.sort();
649    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
650    /// # }));
651    /// # }
652    /// ```
653    pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
654        Stream::new(
655            self.location.clone(),
656            HydroNode::Cast {
657                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
658                metadata: self
659                    .location
660                    .new_node_metadata(Stream::<(K, V), L, B, NoOrder, R>::collection_kind()),
661            },
662        )
663    }
664
665    /// Flattens the keyed stream into a totally ordered stream of key-value pairs,
666    /// preserving the order of values within each key group but non-deterministically
667    /// interleaving across keys.
668    ///
669    /// Requires the keyed stream to be totally ordered within each group (`O: IsOrdered`).
670    ///
671    /// # Non-Determinism
672    /// The interleaving of entries across different keys is non-deterministic.
673    /// Within each key, the original order is preserved.
674    pub fn entries_partially_ordered(
675        self,
676        _nondet: NonDet,
677    ) -> Stream<(K, V), L::DropConsistency, B, TotalOrder, R>
678    where
679        O: IsOrdered,
680    {
681        let target_location = self.location.drop_consistency();
682        Stream::new(
683            target_location.clone(),
684            HydroNode::ObserveNonDet {
685                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
686                trusted: false,
687                metadata: target_location
688                    .new_node_metadata(Stream::<(K, V), L, B, TotalOrder, R>::collection_kind()),
689            },
690        )
691    }
692
693    /// Flattens the keyed stream into an unordered stream of only the values.
694    ///
695    /// # Example
696    /// ```rust
697    /// # #[cfg(feature = "deploy")] {
698    /// # use hydro_lang::prelude::*;
699    /// # use futures::StreamExt;
700    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
701    /// process
702    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
703    ///     .into_keyed()
704    ///     .values()
705    /// # }, |mut stream| async move {
706    /// // 2, 3, 4 in any order
707    /// # let mut results = Vec::new();
708    /// # for _ in 0..3 {
709    /// #     results.push(stream.next().await.unwrap());
710    /// # }
711    /// # results.sort();
712    /// # assert_eq!(results, vec![2, 3, 4]);
713    /// # }));
714    /// # }
715    /// ```
716    pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
717        self.entries().map(q!(|(_, v)| v))
718    }
719
720    /// Flattens the keyed stream into an unordered stream of just the keys.
721    ///
722    /// # Example
723    /// ```rust
724    /// # #[cfg(feature = "deploy")] {
725    /// # use hydro_lang::prelude::*;
726    /// # use futures::StreamExt;
727    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
728    /// # process
729    /// #     .source_iter(q!(vec![(1, 2), (2, 4), (1, 5)]))
730    /// #     .into_keyed()
731    /// #     .keys()
732    /// # }, |mut stream| async move {
733    /// // 1, 2 in any order
734    /// # let mut results = Vec::new();
735    /// # for _ in 0..2 {
736    /// #     results.push(stream.next().await.unwrap());
737    /// # }
738    /// # results.sort();
739    /// # assert_eq!(results, vec![1, 2]);
740    /// # }));
741    /// # }
742    /// ```
743    pub fn keys(self) -> Stream<K, L, B, NoOrder, ExactlyOnce>
744    where
745        K: Eq + Hash,
746    {
747        self.entries().map(q!(|(k, _)| k)).unique()
748    }
749
750    /// Transforms each value by invoking `f` on each element, with keys staying the same
751    /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
752    ///
753    /// If you do not want to modify the stream and instead only want to view
754    /// each item use [`KeyedStream::inspect`] instead.
755    ///
756    /// # Example
757    /// ```rust
758    /// # #[cfg(feature = "deploy")] {
759    /// # use hydro_lang::prelude::*;
760    /// # use futures::StreamExt;
761    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
762    /// process
763    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
764    ///     .into_keyed()
765    ///     .map(q!(|v| v + 1))
766    /// #   .entries()
767    /// # }, |mut stream| async move {
768    /// // { 1: [3, 4], 2: [5] }
769    /// # let mut results = Vec::new();
770    /// # for _ in 0..3 {
771    /// #     results.push(stream.next().await.unwrap());
772    /// # }
773    /// # results.sort();
774    /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 5)]);
775    /// # }));
776    /// # }
777    /// ```
778    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
779    where
780        F: Fn(V) -> U + 'a,
781    {
782        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
783        let map_f = q!({
784            let orig = f;
785            move |(k, v)| (k, orig(v))
786        })
787        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
788        .into();
789
790        KeyedStream::new(
791            self.location.clone(),
792            HydroNode::Map {
793                f: map_f,
794                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
795                metadata: self
796                    .location
797                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
798            },
799        )
800    }
801
802    /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
803    /// re-grouped even they are tuples; instead they will be grouped under the original key.
804    ///
805    /// If you do not want to modify the stream and instead only want to view
806    /// each item use [`KeyedStream::inspect_with_key`] instead.
807    ///
808    /// # Example
809    /// ```rust
810    /// # #[cfg(feature = "deploy")] {
811    /// # use hydro_lang::prelude::*;
812    /// # use futures::StreamExt;
813    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
814    /// process
815    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
816    ///     .into_keyed()
817    ///     .map_with_key(q!(|(k, v)| k + v))
818    /// #   .entries()
819    /// # }, |mut stream| async move {
820    /// // { 1: [3, 4], 2: [6] }
821    /// # let mut results = Vec::new();
822    /// # for _ in 0..3 {
823    /// #     results.push(stream.next().await.unwrap());
824    /// # }
825    /// # results.sort();
826    /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 6)]);
827    /// # }));
828    /// # }
829    /// ```
830    pub fn map_with_key<U, F>(
831        self,
832        f: impl IntoQuotedMut<'a, F, L> + Copy,
833    ) -> KeyedStream<K, U, L, B, O, R>
834    where
835        F: Fn((K, V)) -> U + 'a,
836        K: Clone,
837    {
838        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
839        let map_f = q!({
840            let orig = f;
841            move |(k, v)| {
842                let out = orig((Clone::clone(&k), v));
843                (k, out)
844            }
845        })
846        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
847        .into();
848
849        KeyedStream::new(
850            self.location.clone(),
851            HydroNode::Map {
852                f: map_f,
853                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
854                metadata: self
855                    .location
856                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
857            },
858        )
859    }
860
861    /// Prepends a new value to the key of each element in the stream, producing a new
862    /// keyed stream with compound keys. Because the original key is preserved, no re-grouping
863    /// occurs and the elements in each group preserve their original order.
864    ///
865    /// # Example
866    /// ```rust
867    /// # #[cfg(feature = "deploy")] {
868    /// # use hydro_lang::prelude::*;
869    /// # use futures::StreamExt;
870    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
871    /// process
872    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
873    ///     .into_keyed()
874    ///     .prefix_key(q!(|&(k, _)| k % 2))
875    /// #   .entries()
876    /// # }, |mut stream| async move {
877    /// // { (1, 1): [2, 3], (0, 2): [4] }
878    /// # let mut results = Vec::new();
879    /// # for _ in 0..3 {
880    /// #     results.push(stream.next().await.unwrap());
881    /// # }
882    /// # results.sort();
883    /// # assert_eq!(results, vec![((0, 2), 4), ((1, 1), 2), ((1, 1), 3)]);
884    /// # }));
885    /// # }
886    /// ```
887    pub fn prefix_key<K2, F>(
888        self,
889        f: impl IntoQuotedMut<'a, F, L> + Copy,
890    ) -> KeyedStream<(K2, K), V, L, B, O, R>
891    where
892        F: Fn(&(K, V)) -> K2 + 'a,
893    {
894        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
895        let map_f = q!({
896            let orig = f;
897            move |kv| {
898                let out = orig(&kv);
899                ((out, kv.0), kv.1)
900            }
901        })
902        .splice_fn1_ctx::<(K, V), ((K2, K), V)>(&self.location)
903        .into();
904
905        KeyedStream::new(
906            self.location.clone(),
907            HydroNode::Map {
908                f: map_f,
909                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
910                metadata: self
911                    .location
912                    .new_node_metadata(KeyedStream::<(K2, K), V, L, B, O, R>::collection_kind()),
913            },
914        )
915    }
916
917    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
918    /// `f`, preserving the order of the elements within the group.
919    ///
920    /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
921    /// not modify or take ownership of the values. If you need to modify the values while filtering
922    /// use [`KeyedStream::filter_map`] instead.
923    ///
924    /// # Example
925    /// ```rust
926    /// # #[cfg(feature = "deploy")] {
927    /// # use hydro_lang::prelude::*;
928    /// # use futures::StreamExt;
929    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
930    /// process
931    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
932    ///     .into_keyed()
933    ///     .filter(q!(|&x| x > 2))
934    /// #   .entries()
935    /// # }, |mut stream| async move {
936    /// // { 1: [3], 2: [4] }
937    /// # let mut results = Vec::new();
938    /// # for _ in 0..2 {
939    /// #     results.push(stream.next().await.unwrap());
940    /// # }
941    /// # results.sort();
942    /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
943    /// # }));
944    /// # }
945    /// ```
946    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
947    where
948        F: Fn(&V) -> bool + 'a,
949    {
950        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
951        let filter_f = q!({
952            let orig = f;
953            move |t: &(_, _)| orig(&t.1)
954        })
955        .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
956        .into();
957
958        KeyedStream::new(
959            self.location.clone(),
960            HydroNode::Filter {
961                f: filter_f,
962                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
963                metadata: self.location.new_node_metadata(Self::collection_kind()),
964            },
965        )
966    }
967
968    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
969    /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
970    ///
971    /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
972    /// not modify or take ownership of the values. If you need to modify the values while filtering
973    /// use [`KeyedStream::filter_map_with_key`] instead.
974    ///
975    /// # Example
976    /// ```rust
977    /// # #[cfg(feature = "deploy")] {
978    /// # use hydro_lang::prelude::*;
979    /// # use futures::StreamExt;
980    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
981    /// process
982    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
983    ///     .into_keyed()
984    ///     .filter_with_key(q!(|&(k, v)| v - k == 2))
985    /// #   .entries()
986    /// # }, |mut stream| async move {
987    /// // { 1: [3], 2: [4] }
988    /// # let mut results = Vec::new();
989    /// # for _ in 0..2 {
990    /// #     results.push(stream.next().await.unwrap());
991    /// # }
992    /// # results.sort();
993    /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
994    /// # }));
995    /// # }
996    /// ```
997    pub fn filter_with_key<F>(
998        self,
999        f: impl IntoQuotedMut<'a, F, L> + Copy,
1000    ) -> KeyedStream<K, V, L, B, O, R>
1001    where
1002        F: Fn(&(K, V)) -> bool + 'a,
1003    {
1004        let filter_f = f
1005            .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1006            .into();
1007
1008        KeyedStream::new(
1009            self.location.clone(),
1010            HydroNode::Filter {
1011                f: filter_f,
1012                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1013                metadata: self.location.new_node_metadata(Self::collection_kind()),
1014            },
1015        )
1016    }
1017
1018    /// An operator that both filters and maps each value, with keys staying the same.
1019    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
1020    /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
1021    ///
1022    /// # Example
1023    /// ```rust
1024    /// # #[cfg(feature = "deploy")] {
1025    /// # use hydro_lang::prelude::*;
1026    /// # use futures::StreamExt;
1027    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1028    /// process
1029    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
1030    ///     .into_keyed()
1031    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
1032    /// #   .entries()
1033    /// # }, |mut stream| async move {
1034    /// // { 1: [2], 2: [4] }
1035    /// # let mut results = Vec::new();
1036    /// # for _ in 0..2 {
1037    /// #     results.push(stream.next().await.unwrap());
1038    /// # }
1039    /// # results.sort();
1040    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1041    /// # }));
1042    /// # }
1043    /// ```
1044    pub fn filter_map<U, F>(
1045        self,
1046        f: impl IntoQuotedMut<'a, F, L> + Copy,
1047    ) -> KeyedStream<K, U, L, B, O, R>
1048    where
1049        F: Fn(V) -> Option<U> + 'a,
1050    {
1051        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1052        let filter_map_f = q!({
1053            let orig = f;
1054            move |(k, v)| orig(v).map(|o| (k, o))
1055        })
1056        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1057        .into();
1058
1059        KeyedStream::new(
1060            self.location.clone(),
1061            HydroNode::FilterMap {
1062                f: filter_map_f,
1063                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1064                metadata: self
1065                    .location
1066                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
1067            },
1068        )
1069    }
1070
1071    /// An operator that both filters and maps each key-value pair. The resulting values are **not**
1072    /// re-grouped even they are tuples; instead they will be grouped under the original key.
1073    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
1074    ///
1075    /// # Example
1076    /// ```rust
1077    /// # #[cfg(feature = "deploy")] {
1078    /// # use hydro_lang::prelude::*;
1079    /// # use futures::StreamExt;
1080    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1081    /// process
1082    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
1083    ///     .into_keyed()
1084    ///     .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
1085    /// #   .entries()
1086    /// # }, |mut stream| async move {
1087    /// // { 2: [2] }
1088    /// # let mut results = Vec::new();
1089    /// # for _ in 0..1 {
1090    /// #     results.push(stream.next().await.unwrap());
1091    /// # }
1092    /// # results.sort();
1093    /// # assert_eq!(results, vec![(2, 2)]);
1094    /// # }));
1095    /// # }
1096    /// ```
1097    pub fn filter_map_with_key<U, F>(
1098        self,
1099        f: impl IntoQuotedMut<'a, F, L> + Copy,
1100    ) -> KeyedStream<K, U, L, B, O, R>
1101    where
1102        F: Fn((K, V)) -> Option<U> + 'a,
1103        K: Clone,
1104    {
1105        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1106        let filter_map_f = q!({
1107            let orig = f;
1108            move |(k, v)| {
1109                let out = orig((Clone::clone(&k), v));
1110                out.map(|o| (k, o))
1111            }
1112        })
1113        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1114        .into();
1115
1116        KeyedStream::new(
1117            self.location.clone(),
1118            HydroNode::FilterMap {
1119                f: filter_map_f,
1120                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1121                metadata: self
1122                    .location
1123                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
1124            },
1125        )
1126    }
1127
1128    /// Generates a keyed stream that maps each value `v` to a tuple `(v, x)`,
1129    /// where `v` is the value of `other`, a bounded [`super::singleton::Singleton`] or
1130    /// [`Optional`]. If `other` is an empty [`Optional`], no values will be produced.
1131    ///
1132    /// # Example
1133    /// ```rust
1134    /// # #[cfg(feature = "deploy")] {
1135    /// # use hydro_lang::prelude::*;
1136    /// # use futures::StreamExt;
1137    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1138    /// let tick = process.tick();
1139    /// let batch = process
1140    ///   .source_iter(q!(vec![(1, 123), (1, 456), (2, 123)]))
1141    ///   .into_keyed()
1142    ///   .batch(&tick, nondet!(/** test */));
1143    /// let count = batch.clone().entries().count(); // `count()` returns a singleton
1144    /// batch.cross_singleton(count).all_ticks().entries()
1145    /// # }, |mut stream| async move {
1146    /// // { 1: [(123, 3), (456, 3)], 2: [(123, 3)] }
1147    /// # let mut results = Vec::new();
1148    /// # for _ in 0..3 {
1149    /// #     results.push(stream.next().await.unwrap());
1150    /// # }
1151    /// # results.sort();
1152    /// # assert_eq!(results, vec![(1, (123, 3)), (1, (456, 3)), (2, (123, 3))]);
1153    /// # }));
1154    /// # }
1155    /// ```
1156    pub fn cross_singleton<O2>(
1157        self,
1158        other: impl Into<Optional<O2, L, Bounded>>,
1159    ) -> KeyedStream<K, (V, O2), L, B, O, R>
1160    where
1161        O2: Clone,
1162    {
1163        let other: Optional<O2, L, Bounded> = other.into();
1164        check_matching_location(&self.location, &other.location);
1165
1166        Stream::new(
1167            self.location.clone(),
1168            HydroNode::CrossSingleton {
1169                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1170                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1171                metadata: self
1172                    .location
1173                    .new_node_metadata(Stream::<((K, V), O2), L, B, O, R>::collection_kind()),
1174            },
1175        )
1176        .map(q!(|((k, v), o2)| (k, (v, o2))))
1177        .into_keyed()
1178    }
1179
1180    /// For each value `v` in each group, transform `v` using `f` and then treat the
1181    /// result as an [`Iterator`] to produce values one by one within the same group.
1182    /// The implementation for [`Iterator`] for the output type `I` must produce items
1183    /// in a **deterministic** order.
1184    ///
1185    /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
1186    /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
1187    ///
1188    /// # Example
1189    /// ```rust
1190    /// # #[cfg(feature = "deploy")] {
1191    /// # use hydro_lang::prelude::*;
1192    /// # use futures::StreamExt;
1193    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1194    /// process
1195    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
1196    ///     .into_keyed()
1197    ///     .flat_map_ordered(q!(|x| x))
1198    /// #   .entries()
1199    /// # }, |mut stream| async move {
1200    /// // { 1: [2, 3, 4], 2: [5, 6] }
1201    /// # let mut results = Vec::new();
1202    /// # for _ in 0..5 {
1203    /// #     results.push(stream.next().await.unwrap());
1204    /// # }
1205    /// # results.sort();
1206    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1207    /// # }));
1208    /// # }
1209    /// ```
1210    pub fn flat_map_ordered<U, I, F>(
1211        self,
1212        f: impl IntoQuotedMut<'a, F, L> + Copy,
1213    ) -> KeyedStream<K, U, L, B, O, R>
1214    where
1215        I: IntoIterator<Item = U>,
1216        F: Fn(V) -> I + 'a,
1217        K: Clone,
1218    {
1219        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1220        let flat_map_f = q!({
1221            let orig = f;
1222            move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1223        })
1224        .splice_fn1_ctx::<(K, V), _>(&self.location)
1225        .into();
1226
1227        KeyedStream::new(
1228            self.location.clone(),
1229            HydroNode::FlatMap {
1230                f: flat_map_f,
1231                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1232                metadata: self
1233                    .location
1234                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
1235            },
1236        )
1237    }
1238
1239    /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
1240    /// for the output type `I` to produce items in any order.
1241    ///
1242    /// # Example
1243    /// ```rust
1244    /// # #[cfg(feature = "deploy")] {
1245    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1246    /// # use futures::StreamExt;
1247    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1248    /// process
1249    ///     .source_iter(q!(vec![
1250    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1251    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
1252    ///     ]))
1253    ///     .into_keyed()
1254    ///     .flat_map_unordered(q!(|x| x))
1255    /// #   .entries()
1256    /// # }, |mut stream| async move {
1257    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1258    /// # let mut results = Vec::new();
1259    /// # for _ in 0..4 {
1260    /// #     results.push(stream.next().await.unwrap());
1261    /// # }
1262    /// # results.sort();
1263    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1264    /// # }));
1265    /// # }
1266    /// ```
1267    pub fn flat_map_unordered<U, I, F>(
1268        self,
1269        f: impl IntoQuotedMut<'a, F, L> + Copy,
1270    ) -> KeyedStream<K, U, L, B, NoOrder, R>
1271    where
1272        I: IntoIterator<Item = U>,
1273        F: Fn(V) -> I + 'a,
1274        K: Clone,
1275    {
1276        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1277        let flat_map_f = q!({
1278            let orig = f;
1279            move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1280        })
1281        .splice_fn1_ctx::<(K, V), _>(&self.location)
1282        .into();
1283
1284        KeyedStream::new(
1285            self.location.clone(),
1286            HydroNode::FlatMap {
1287                f: flat_map_f,
1288                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1289                metadata: self
1290                    .location
1291                    .new_node_metadata(KeyedStream::<K, U, L, B, NoOrder, R>::collection_kind()),
1292            },
1293        )
1294    }
1295
1296    /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
1297    /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
1298    /// items in a **deterministic** order.
1299    ///
1300    /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
1301    /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
1302    ///
1303    /// # Example
1304    /// ```rust
1305    /// # #[cfg(feature = "deploy")] {
1306    /// # use hydro_lang::prelude::*;
1307    /// # use futures::StreamExt;
1308    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1309    /// process
1310    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
1311    ///     .into_keyed()
1312    ///     .flatten_ordered()
1313    /// #   .entries()
1314    /// # }, |mut stream| async move {
1315    /// // { 1: [2, 3, 4], 2: [5, 6] }
1316    /// # let mut results = Vec::new();
1317    /// # for _ in 0..5 {
1318    /// #     results.push(stream.next().await.unwrap());
1319    /// # }
1320    /// # results.sort();
1321    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1322    /// # }));
1323    /// # }
1324    /// ```
1325    pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
1326    where
1327        V: IntoIterator<Item = U>,
1328        K: Clone,
1329    {
1330        self.flat_map_ordered(q!(|d| d))
1331    }
1332
1333    /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
1334    /// for the value type `V` to produce items in any order.
1335    ///
1336    /// # Example
1337    /// ```rust
1338    /// # #[cfg(feature = "deploy")] {
1339    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1340    /// # use futures::StreamExt;
1341    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1342    /// process
1343    ///     .source_iter(q!(vec![
1344    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1345    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
1346    ///     ]))
1347    ///     .into_keyed()
1348    ///     .flatten_unordered()
1349    /// #   .entries()
1350    /// # }, |mut stream| async move {
1351    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1352    /// # let mut results = Vec::new();
1353    /// # for _ in 0..4 {
1354    /// #     results.push(stream.next().await.unwrap());
1355    /// # }
1356    /// # results.sort();
1357    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1358    /// # }));
1359    /// # }
1360    /// ```
1361    pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
1362    where
1363        V: IntoIterator<Item = U>,
1364        K: Clone,
1365    {
1366        self.flat_map_unordered(q!(|d| d))
1367    }
1368
1369    /// An operator which allows you to "inspect" each element of a stream without
1370    /// modifying it. The closure `f` is called on a reference to each value. This is
1371    /// mainly useful for debugging, and should not be used to generate side-effects.
1372    ///
1373    /// # Example
1374    /// ```rust
1375    /// # #[cfg(feature = "deploy")] {
1376    /// # use hydro_lang::prelude::*;
1377    /// # use futures::StreamExt;
1378    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1379    /// process
1380    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1381    ///     .into_keyed()
1382    ///     .inspect(q!(|v| println!("{}", v)))
1383    /// #   .entries()
1384    /// # }, |mut stream| async move {
1385    /// # let mut results = Vec::new();
1386    /// # for _ in 0..3 {
1387    /// #     results.push(stream.next().await.unwrap());
1388    /// # }
1389    /// # results.sort();
1390    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1391    /// # }));
1392    /// # }
1393    /// ```
1394    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1395    where
1396        F: Fn(&V) + 'a,
1397    {
1398        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1399        let inspect_f = q!({
1400            let orig = f;
1401            move |t: &(_, _)| orig(&t.1)
1402        })
1403        .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1404        .into();
1405
1406        KeyedStream::new(
1407            self.location.clone(),
1408            HydroNode::Inspect {
1409                f: inspect_f,
1410                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1411                metadata: self.location.new_node_metadata(Self::collection_kind()),
1412            },
1413        )
1414    }
1415
1416    /// An operator which allows you to "inspect" each element of a stream without
1417    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1418    /// mainly useful for debugging, and should not be used to generate side-effects.
1419    ///
1420    /// # Example
1421    /// ```rust
1422    /// # #[cfg(feature = "deploy")] {
1423    /// # use hydro_lang::prelude::*;
1424    /// # use futures::StreamExt;
1425    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1426    /// process
1427    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1428    ///     .into_keyed()
1429    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1430    /// #   .entries()
1431    /// # }, |mut stream| async move {
1432    /// # let mut results = Vec::new();
1433    /// # for _ in 0..3 {
1434    /// #     results.push(stream.next().await.unwrap());
1435    /// # }
1436    /// # results.sort();
1437    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1438    /// # }));
1439    /// # }
1440    /// ```
1441    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1442    where
1443        F: Fn(&(K, V)) + 'a,
1444    {
1445        let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1446
1447        KeyedStream::new(
1448            self.location.clone(),
1449            HydroNode::Inspect {
1450                f: inspect_f,
1451                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1452                metadata: self.location.new_node_metadata(Self::collection_kind()),
1453            },
1454        )
1455    }
1456
1457    /// An operator which allows you to "name" a `HydroNode`.
1458    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1459    pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
1460        {
1461            let mut node = self.ir_node.borrow_mut();
1462            let metadata = node.metadata_mut();
1463            metadata.tag = Some(name.to_owned());
1464        }
1465        self
1466    }
1467
1468    /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
1469    ///
1470    /// Unlike [`KeyedStream::fold`] which only returns the final accumulated value, `scan` produces a new stream
1471    /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
1472    /// early by returning `None`.
1473    ///
1474    /// The function takes a mutable reference to the accumulator and the current element, and returns
1475    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1476    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1477    ///
1478    /// # Example
1479    /// ```rust
1480    /// # #[cfg(feature = "deploy")] {
1481    /// # use hydro_lang::prelude::*;
1482    /// # use futures::StreamExt;
1483    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1484    /// process
1485    ///     .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
1486    ///     .into_keyed()
1487    ///     .scan(
1488    ///         q!(|| 0),
1489    ///         q!(|acc, x| {
1490    ///             *acc += x;
1491    ///             if *acc % 2 == 0 { None } else { Some(*acc) }
1492    ///         }),
1493    ///     )
1494    /// #   .entries()
1495    /// # }, |mut stream| async move {
1496    /// // Output: { 0: [1], 1: [3, 7] }
1497    /// # let mut results = Vec::new();
1498    /// # for _ in 0..3 {
1499    /// #     results.push(stream.next().await.unwrap());
1500    /// # }
1501    /// # results.sort();
1502    /// # assert_eq!(results, vec![(0, 1), (1, 3), (1, 7)]);
1503    /// # }));
1504    /// # }
1505    /// ```
1506    pub fn scan<A, U, I, F>(
1507        self,
1508        init: impl IntoQuotedMut<'a, I, L> + Copy,
1509        f: impl IntoQuotedMut<'a, F, L> + Copy,
1510    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1511    where
1512        O: IsOrdered,
1513        R: IsExactlyOnce,
1514        K: Clone + Eq + Hash,
1515        I: Fn() -> A + 'a,
1516        F: Fn(&mut A, V) -> Option<U> + 'a,
1517    {
1518        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1519        self.make_totally_ordered().make_exactly_once().generator(
1520            init,
1521            q!({
1522                let orig = f;
1523                move |state, v| {
1524                    if let Some(out) = orig(state, v) {
1525                        Generate::Yield(out)
1526                    } else {
1527                        Generate::Break
1528                    }
1529                }
1530            }),
1531        )
1532    }
1533
1534    /// Iteratively processes the elements in each group using a state machine that can yield
1535    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1536    /// syntax in Rust, without requiring special syntax.
1537    ///
1538    /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
1539    /// state for each group. The second argument defines the processing logic, taking in a
1540    /// mutable reference to the group's state and the value to be processed. It emits a
1541    /// [`Generate`] value, whose variants define what is emitted and whether further inputs
1542    /// should be processed.
1543    ///
1544    /// # Example
1545    /// ```rust
1546    /// # #[cfg(feature = "deploy")] {
1547    /// # use hydro_lang::prelude::*;
1548    /// # use futures::StreamExt;
1549    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1550    /// process
1551    ///     .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
1552    ///     .into_keyed()
1553    ///     .generator(
1554    ///         q!(|| 0),
1555    ///         q!(|acc, x| {
1556    ///             *acc += x;
1557    ///             if *acc > 100 {
1558    ///                 hydro_lang::live_collections::keyed_stream::Generate::Return(
1559    ///                     "done!".to_owned()
1560    ///                 )
1561    ///             } else if *acc % 2 == 0 {
1562    ///                 hydro_lang::live_collections::keyed_stream::Generate::Yield(
1563    ///                     "even".to_owned()
1564    ///                 )
1565    ///             } else {
1566    ///                 hydro_lang::live_collections::keyed_stream::Generate::Continue
1567    ///             }
1568    ///         }),
1569    ///     )
1570    /// #   .entries()
1571    /// # }, |mut stream| async move {
1572    /// // Output: { 0: ["even", "done!"], 1: ["even"] }
1573    /// # let mut results = Vec::new();
1574    /// # for _ in 0..3 {
1575    /// #     results.push(stream.next().await.unwrap());
1576    /// # }
1577    /// # results.sort();
1578    /// # assert_eq!(results, vec![(0, "done!".to_owned()), (0, "even".to_owned()), (1, "even".to_owned())]);
1579    /// # }));
1580    /// # }
1581    /// ```
1582    pub fn generator<A, U, I, F>(
1583        self,
1584        init: impl IntoQuotedMut<'a, I, L> + Copy,
1585        f: impl IntoQuotedMut<'a, F, L> + Copy,
1586    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1587    where
1588        O: IsOrdered,
1589        R: IsExactlyOnce,
1590        K: Clone + Eq + Hash,
1591        I: Fn() -> A + 'a,
1592        F: Fn(&mut A, V) -> Generate<U> + 'a,
1593    {
1594        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1595        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1596
1597        let this = self.make_totally_ordered().make_exactly_once();
1598
1599        let scan_init = q!(|| HashMap::new())
1600            .splice_fn0_ctx::<HashMap<K, Option<A>>>(&this.location)
1601            .into();
1602        let scan_f = q!(move |acc: &mut HashMap<_, _>, (k, v)| {
1603            let existing_state = acc.entry(Clone::clone(&k)).or_insert_with(|| Some(init()));
1604            if let Some(existing_state_value) = existing_state {
1605                match f(existing_state_value, v) {
1606                    Generate::Yield(out) => Some(Some((k, out))),
1607                    Generate::Return(out) => {
1608                        let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1609                        Some(Some((k, out)))
1610                    }
1611                    Generate::Break => {
1612                        let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1613                        Some(None)
1614                    }
1615                    Generate::Continue => Some(None),
1616                }
1617            } else {
1618                Some(None)
1619            }
1620        })
1621        .splice_fn2_borrow_mut_ctx::<HashMap<K, Option<A>>, (K, V), _>(&this.location)
1622        .into();
1623
1624        let scan_node = HydroNode::Scan {
1625            init: scan_init,
1626            acc: scan_f,
1627            input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1628            metadata: this.location.new_node_metadata(Stream::<
1629                Option<(K, U)>,
1630                L,
1631                B,
1632                TotalOrder,
1633                ExactlyOnce,
1634            >::collection_kind()),
1635        };
1636
1637        let flatten_f = q!(|d| d)
1638            .splice_fn1_ctx::<Option<(K, U)>, _>(&this.location)
1639            .into();
1640        let flatten_node = HydroNode::FlatMap {
1641            f: flatten_f,
1642            input: Box::new(scan_node),
1643            metadata: this.location.new_node_metadata(KeyedStream::<
1644                K,
1645                U,
1646                L,
1647                B,
1648                TotalOrder,
1649                ExactlyOnce,
1650            >::collection_kind()),
1651        };
1652
1653        KeyedStream::new(this.location.clone(), flatten_node)
1654    }
1655
1656    /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
1657    /// in-order across the values in each group. But the aggregation function returns a boolean,
1658    /// which when true indicates that the aggregated result is complete and can be released to
1659    /// downstream computation. Unlike [`KeyedStream::fold`], this means that even if the input
1660    /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
1661    /// normal stream elements.
1662    ///
1663    /// # Example
1664    /// ```rust
1665    /// # #[cfg(feature = "deploy")] {
1666    /// # use hydro_lang::prelude::*;
1667    /// # use futures::StreamExt;
1668    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1669    /// process
1670    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1671    ///     .into_keyed()
1672    ///     .fold_early_stop(
1673    ///         q!(|| 0),
1674    ///         q!(|acc, x| {
1675    ///             *acc += x;
1676    ///             x % 2 == 0
1677    ///         }),
1678    ///     )
1679    /// #   .entries()
1680    /// # }, |mut stream| async move {
1681    /// // Output: { 0: 2, 1: 9 }
1682    /// # let mut results = Vec::new();
1683    /// # for _ in 0..2 {
1684    /// #     results.push(stream.next().await.unwrap());
1685    /// # }
1686    /// # results.sort();
1687    /// # assert_eq!(results, vec![(0, 2), (1, 9)]);
1688    /// # }));
1689    /// # }
1690    /// ```
1691    pub fn fold_early_stop<A, I, F>(
1692        self,
1693        init: impl IntoQuotedMut<'a, I, L> + Copy,
1694        f: impl IntoQuotedMut<'a, F, L> + Copy,
1695    ) -> KeyedSingleton<K, A, L, B::WithBoundedValue>
1696    where
1697        O: IsOrdered,
1698        R: IsExactlyOnce,
1699        K: Clone + Eq + Hash,
1700        I: Fn() -> A + 'a,
1701        F: Fn(&mut A, V) -> bool + 'a,
1702    {
1703        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1704        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1705        let out_without_bound_cast = self.generator(
1706            q!(move || Some(init())),
1707            q!(move |key_state, v| {
1708                if let Some(key_state_value) = key_state.as_mut() {
1709                    if f(key_state_value, v) {
1710                        Generate::Return(key_state.take().unwrap())
1711                    } else {
1712                        Generate::Continue
1713                    }
1714                } else {
1715                    unreachable!()
1716                }
1717            }),
1718        );
1719
1720        // SAFETY: The generator will only ever return at most one value per key, since once it
1721        // returns a value for a key it will never process any more values for that key.
1722        out_without_bound_cast.cast_at_most_one_entry_per_key()
1723    }
1724
1725    /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
1726    /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
1727    /// otherwise the first element would be non-deterministic.
1728    ///
1729    /// # Example
1730    /// ```rust
1731    /// # #[cfg(feature = "deploy")] {
1732    /// # use hydro_lang::prelude::*;
1733    /// # use futures::StreamExt;
1734    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1735    /// process
1736    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1737    ///     .into_keyed()
1738    ///     .first()
1739    /// #   .entries()
1740    /// # }, |mut stream| async move {
1741    /// // Output: { 0: 2, 1: 3 }
1742    /// # let mut results = Vec::new();
1743    /// # for _ in 0..2 {
1744    /// #     results.push(stream.next().await.unwrap());
1745    /// # }
1746    /// # results.sort();
1747    /// # assert_eq!(results, vec![(0, 2), (1, 3)]);
1748    /// # }));
1749    /// # }
1750    /// ```
1751    pub fn first(self) -> KeyedSingleton<K, V, L, B::WithBoundedValue>
1752    where
1753        O: IsOrdered,
1754        R: IsExactlyOnce,
1755        K: Clone + Eq + Hash,
1756    {
1757        self.fold_early_stop(
1758            q!(|| None),
1759            q!(|acc, v| {
1760                *acc = Some(v);
1761                true
1762            }),
1763        )
1764        .map(q!(|v| v.unwrap()))
1765    }
1766
1767    /// Returns a keyed stream containing at most the first `n` values per key,
1768    /// preserving the original order within each group. Similar to SQL `LIMIT`
1769    /// applied per group.
1770    ///
1771    /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1772    /// retries, since the result depends on the order and cardinality of elements
1773    /// within each group.
1774    ///
1775    /// # Example
1776    /// ```rust
1777    /// # #[cfg(feature = "deploy")] {
1778    /// # use hydro_lang::prelude::*;
1779    /// # use futures::StreamExt;
1780    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1781    /// process
1782    ///     .source_iter(q!(vec![(1, 10), (1, 20), (1, 30), (2, 40), (2, 50)]))
1783    ///     .into_keyed()
1784    ///     .limit(q!(2))
1785    /// #   .entries()
1786    /// # }, |mut stream| async move {
1787    /// // { 1: [10, 20], 2: [40, 50] }
1788    /// # let mut results = Vec::new();
1789    /// # for _ in 0..4 {
1790    /// #     results.push(stream.next().await.unwrap());
1791    /// # }
1792    /// # results.sort();
1793    /// # assert_eq!(results, vec![(1, 10), (1, 20), (2, 40), (2, 50)]);
1794    /// # }));
1795    /// # }
1796    /// ```
1797    pub fn limit(
1798        self,
1799        n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1800    ) -> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
1801    where
1802        O: IsOrdered,
1803        R: IsExactlyOnce,
1804        K: Clone + Eq + Hash,
1805    {
1806        self.generator(
1807            q!(|| 0usize),
1808            q!(move |count, item| {
1809                if *count == n {
1810                    Generate::Break
1811                } else {
1812                    *count += 1;
1813                    if *count == n {
1814                        Generate::Return(item)
1815                    } else {
1816                        Generate::Yield(item)
1817                    }
1818                }
1819            }),
1820        )
1821    }
1822
1823    /// Assigns a zero-based index to each value within each key group, emitting
1824    /// `(K, (index, V))` tuples with per-key sequential indices.
1825    ///
1826    /// The output keyed stream has [`TotalOrder`] and [`ExactlyOnce`] guarantees.
1827    /// This is a streaming operator that processes elements as they arrive.
1828    ///
1829    /// # Example
1830    /// ```rust
1831    /// # #[cfg(feature = "deploy")] {
1832    /// # use hydro_lang::prelude::*;
1833    /// # use futures::StreamExt;
1834    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1835    /// process
1836    ///     .source_iter(q!(vec![(1, 10), (2, 20), (1, 30)]))
1837    ///     .into_keyed()
1838    ///     .enumerate()
1839    /// # .entries()
1840    /// # }, |mut stream| async move {
1841    /// // per-key indices: { 1: [(0, 10), (1, 30)], 2: [(0, 20)] }
1842    /// # let mut results = Vec::new();
1843    /// # for _ in 0..3 {
1844    /// #     results.push(stream.next().await.unwrap());
1845    /// # }
1846    /// # let key1: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
1847    /// # let key2: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
1848    /// # assert_eq!(key1, vec![(0, 10), (1, 30)]);
1849    /// # assert_eq!(key2, vec![(0, 20)]);
1850    /// # }));
1851    /// # }
1852    /// ```
1853    pub fn enumerate(self) -> KeyedStream<K, (usize, V), L, B, TotalOrder, ExactlyOnce>
1854    where
1855        O: IsOrdered,
1856        R: IsExactlyOnce,
1857        K: Eq + Hash + Clone,
1858    {
1859        self.scan(
1860            q!(|| 0),
1861            q!(|acc, next| {
1862                let curr = *acc;
1863                *acc += 1;
1864                Some((curr, next))
1865            }),
1866        )
1867    }
1868
1869    /// Counts the number of elements in each group, producing a [`KeyedSingleton`] with the counts.
1870    ///
1871    /// # Example
1872    /// ```rust
1873    /// # #[cfg(feature = "deploy")] {
1874    /// # use hydro_lang::prelude::*;
1875    /// # use futures::StreamExt;
1876    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1877    /// let tick = process.tick();
1878    /// let numbers = process
1879    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4), (1, 5)]))
1880    ///     .into_keyed();
1881    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1882    /// batch
1883    ///     .value_counts()
1884    ///     .entries()
1885    ///     .all_ticks()
1886    /// # }, |mut stream| async move {
1887    /// // (1, 3), (2, 2)
1888    /// # let mut results = Vec::new();
1889    /// # for _ in 0..2 {
1890    /// #     results.push(stream.next().await.unwrap());
1891    /// # }
1892    /// # results.sort();
1893    /// # assert_eq!(results, vec![(1, 3), (2, 2)]);
1894    /// # }));
1895    /// # }
1896    /// ```
1897    pub fn value_counts(
1898        self,
1899    ) -> KeyedSingleton<K, usize, L, <B as KeyedSingletonBound>::KeyedStreamToMonotone>
1900    where
1901        R: IsExactlyOnce,
1902        K: Eq + Hash,
1903    {
1904        self.make_exactly_once()
1905            .assume_ordering_trusted(
1906                nondet!(/** ordering within each group affects neither result nor intermediates */),
1907            )
1908            .fold(
1909                q!(|| 0),
1910                q!(
1911                    |acc, _| *acc += 1,
1912                    monotone = manual_proof!(/** += 1 is monotonic */)
1913                ),
1914            )
1915    }
1916
1917    /// Like [`Stream::fold`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1918    /// group via the `comb` closure.
1919    ///
1920    /// Depending on the input stream guarantees, the closure may need to be commutative
1921    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1922    ///
1923    /// If the input and output value types are the same and do not require initialization then use
1924    /// [`KeyedStream::reduce`].
1925    ///
1926    /// # Example
1927    /// ```rust
1928    /// # #[cfg(feature = "deploy")] {
1929    /// # use hydro_lang::prelude::*;
1930    /// # use futures::StreamExt;
1931    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1932    /// let tick = process.tick();
1933    /// let numbers = process
1934    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1935    ///     .into_keyed();
1936    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1937    /// batch
1938    ///     .fold(q!(|| false), q!(|acc, x| *acc |= x))
1939    ///     .entries()
1940    ///     .all_ticks()
1941    /// # }, |mut stream| async move {
1942    /// // (1, false), (2, true)
1943    /// # let mut results = Vec::new();
1944    /// # for _ in 0..2 {
1945    /// #     results.push(stream.next().await.unwrap());
1946    /// # }
1947    /// # results.sort();
1948    /// # assert_eq!(results, vec![(1, false), (2, true)]);
1949    /// # }));
1950    /// # }
1951    /// ```
1952    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V), C, Idemp, M, B2: KeyedSingletonBound>(
1953        self,
1954        init: impl IntoQuotedMut<'a, I, L>,
1955        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1956    ) -> KeyedSingleton<K, A, L, B2>
1957    where
1958        K: Eq + Hash,
1959        C: ValidCommutativityFor<O>,
1960        Idemp: ValidIdempotenceFor<R>,
1961        B: ApplyMonotoneKeyedStream<M, B2>,
1962    {
1963        let init = init.splice_fn0_ctx(&self.location).into();
1964        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1965        proof.register_proof(&comb);
1966
1967        let retried = self
1968            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */));
1969
1970        KeyedSingleton::new(
1971            retried.location.clone(),
1972            HydroNode::FoldKeyed {
1973                init,
1974                acc: comb.into(),
1975                input: Box::new(retried.ir_node.replace(HydroNode::Placeholder)),
1976                metadata: retried
1977                    .location
1978                    .new_node_metadata(KeyedSingleton::<K, A, L, B2>::collection_kind()),
1979            },
1980        )
1981        .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
1982    }
1983
1984    /// Like [`Stream::reduce`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1985    /// group via the `comb` closure.
1986    ///
1987    /// Depending on the input stream guarantees, the closure may need to be commutative
1988    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1989    ///
1990    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1991    ///
1992    /// # Example
1993    /// ```rust
1994    /// # #[cfg(feature = "deploy")] {
1995    /// # use hydro_lang::prelude::*;
1996    /// # use futures::StreamExt;
1997    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1998    /// let tick = process.tick();
1999    /// let numbers = process
2000    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
2001    ///     .into_keyed();
2002    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2003    /// batch
2004    ///     .reduce(q!(|acc, x| *acc |= x))
2005    ///     .entries()
2006    ///     .all_ticks()
2007    /// # }, |mut stream| async move {
2008    /// // (1, false), (2, true)
2009    /// # let mut results = Vec::new();
2010    /// # for _ in 0..2 {
2011    /// #     results.push(stream.next().await.unwrap());
2012    /// # }
2013    /// # results.sort();
2014    /// # assert_eq!(results, vec![(1, false), (2, true)]);
2015    /// # }));
2016    /// # }
2017    /// ```
2018    pub fn reduce<F: Fn(&mut V, V) + 'a, C, Idemp>(
2019        self,
2020        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
2021    ) -> KeyedSingleton<K, V, L, B>
2022    where
2023        K: Eq + Hash,
2024        C: ValidCommutativityFor<O>,
2025        Idemp: ValidIdempotenceFor<R>,
2026    {
2027        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
2028        proof.register_proof(&f);
2029
2030        let ordered = self
2031            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
2032            .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
2033
2034        KeyedSingleton::new(
2035            ordered.location.clone(),
2036            HydroNode::ReduceKeyed {
2037                f: f.into(),
2038                input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
2039                metadata: ordered
2040                    .location
2041                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
2042            },
2043        )
2044        .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
2045    }
2046
2047    /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark
2048    /// are automatically deleted.
2049    ///
2050    /// Depending on the input stream guarantees, the closure may need to be commutative
2051    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
2052    ///
2053    /// # Example
2054    /// ```rust
2055    /// # #[cfg(feature = "deploy")] {
2056    /// # use hydro_lang::prelude::*;
2057    /// # use futures::StreamExt;
2058    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2059    /// let tick = process.tick();
2060    /// let watermark = tick.singleton(q!(2));
2061    /// let numbers = process
2062    ///     .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
2063    ///     .into_keyed();
2064    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2065    /// batch
2066    ///     .reduce_watermark(watermark, q!(|acc, x| *acc |= x))
2067    ///     .entries()
2068    ///     .all_ticks()
2069    /// # }, |mut stream| async move {
2070    /// // (2, true)
2071    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2072    /// # }));
2073    /// # }
2074    /// ```
2075    pub fn reduce_watermark<O2, F, C, Idemp>(
2076        self,
2077        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
2078        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
2079    ) -> KeyedSingleton<K, V, L, B>
2080    where
2081        K: Eq + Hash,
2082        O2: Clone,
2083        F: Fn(&mut V, V) + 'a,
2084        C: ValidCommutativityFor<O>,
2085        Idemp: ValidIdempotenceFor<R>,
2086    {
2087        let other: Optional<O2, Tick<L::Root>, Bounded> = other.into();
2088        check_matching_location(&self.location.root(), other.location.outer());
2089        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
2090        proof.register_proof(&f);
2091
2092        let ordered = self
2093            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
2094            .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
2095
2096        KeyedSingleton::new(
2097            ordered.location.clone(),
2098            HydroNode::ReduceKeyedWatermark {
2099                f: f.into(),
2100                input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
2101                watermark: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2102                metadata: ordered
2103                    .location
2104                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
2105            },
2106        )
2107        .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
2108    }
2109
2110    /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
2111    /// whose keys are not in the bounded stream.
2112    ///
2113    /// # Example
2114    /// ```rust
2115    /// # #[cfg(feature = "deploy")] {
2116    /// # use hydro_lang::prelude::*;
2117    /// # use futures::StreamExt;
2118    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2119    /// let tick = process.tick();
2120    /// let keyed_stream = process
2121    ///     .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2122    ///     .batch(&tick, nondet!(/** test */))
2123    ///     .into_keyed();
2124    /// let keys_to_remove = process
2125    ///     .source_iter(q!(vec![1, 2]))
2126    ///     .batch(&tick, nondet!(/** test */));
2127    /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
2128    /// #   .entries()
2129    /// # }, |mut stream| async move {
2130    /// // { 3: ['c'], 4: ['d'] }
2131    /// # let mut results = Vec::new();
2132    /// # for _ in 0..2 {
2133    /// #     results.push(stream.next().await.unwrap());
2134    /// # }
2135    /// # results.sort();
2136    /// # assert_eq!(results, vec![(3, 'c'), (4, 'd')]);
2137    /// # }));
2138    /// # }
2139    /// ```
2140    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
2141        self,
2142        other: Stream<K, L, Bounded, O2, R2>,
2143    ) -> Self
2144    where
2145        K: Eq + Hash,
2146    {
2147        check_matching_location(&self.location, &other.location);
2148
2149        KeyedStream::new(
2150            self.location.clone(),
2151            HydroNode::AntiJoin {
2152                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2153                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2154                metadata: self.location.new_node_metadata(Self::collection_kind()),
2155            },
2156        )
2157    }
2158
2159    /// Emit a keyed stream containing keys shared between two keyed streams,
2160    /// where each value in the output keyed stream is a tuple of
2161    /// (self's value, other's value).
2162    /// If there are multiple values for the same key, this performs a cross product
2163    /// for each matching key.
2164    ///
2165    /// # Example
2166    /// ```rust
2167    /// # #[cfg(feature = "deploy")] {
2168    /// # use hydro_lang::prelude::*;
2169    /// # use futures::StreamExt;
2170    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2171    /// let tick = process.tick();
2172    /// let keyed_data = process
2173    ///     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2174    ///     .into_keyed()
2175    ///     .batch(&tick, nondet!(/** test */));
2176    /// let other_data = process
2177    ///     .source_iter(q!(vec![(1, 100), (2, 200), (2, 201)]))
2178    ///     .into_keyed()
2179    ///     .batch(&tick, nondet!(/** test */));
2180    /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
2181    /// # }, |mut stream| async move {
2182    /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200), (20, 201)] } in any order
2183    /// # let mut results = vec![];
2184    /// # for _ in 0..4 {
2185    /// #     results.push(stream.next().await.unwrap());
2186    /// # }
2187    /// # results.sort();
2188    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200)), (2, (20, 201))]);
2189    /// # }));
2190    /// # }
2191    /// ```
2192    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2193    pub fn join_keyed_stream<V2, B2: Boundedness, O2: Ordering, R2: Retries>(
2194        self,
2195        other: KeyedStream<K, V2, L, B2, O2, R2>,
2196    ) -> KeyedStream<
2197        K,
2198        (V, V2),
2199        L,
2200        B,
2201        B2::PreserveOrderIfBounded<NoOrder>,
2202        <R as MinRetries<R2>>::Min,
2203    >
2204    where
2205        K: Eq + Hash + Clone,
2206        R: MinRetries<R2>,
2207        V: Clone,
2208        V2: Clone,
2209    {
2210        self.entries().join(other.entries()).into_keyed()
2211    }
2212
2213    /// Deduplicates values within each key group, emitting each unique value per key
2214    /// exactly once.
2215    ///
2216    /// # Example
2217    /// ```rust
2218    /// # #[cfg(feature = "deploy")] {
2219    /// # use hydro_lang::prelude::*;
2220    /// # use futures::StreamExt;
2221    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2222    /// process
2223    ///     .source_iter(q!(vec![(1, 10), (2, 20), (1, 10), (2, 30), (1, 20)]))
2224    ///     .into_keyed()
2225    ///     .unique()
2226    /// # .entries()
2227    /// # }, |mut stream| async move {
2228    /// // unique values per key: { 1: [10, 20], 2: [20, 30] }
2229    /// # let mut results = Vec::new();
2230    /// # for _ in 0..4 {
2231    /// #     results.push(stream.next().await.unwrap());
2232    /// # }
2233    /// # let mut key1: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
2234    /// # let mut key2: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
2235    /// # key1.sort();
2236    /// # key2.sort();
2237    /// # assert_eq!(key1, vec![10, 20]);
2238    /// # assert_eq!(key2, vec![20, 30]);
2239    /// # }));
2240    /// # }
2241    /// ```
2242    pub fn unique(self) -> KeyedStream<K, V, L, B, NoOrder, ExactlyOnce>
2243    where
2244        K: Eq + Hash + Clone,
2245        V: Eq + Hash + Clone,
2246    {
2247        self.entries().unique().into_keyed()
2248    }
2249
2250    /// Sorts the values within each key group in ascending order.
2251    ///
2252    /// The output keyed stream has a [`TotalOrder`] guarantee on the values within
2253    /// each group. This operator will block until all elements in the input stream
2254    /// are available, so it requires the input stream to be [`Bounded`].
2255    ///
2256    /// # Example
2257    /// ```rust
2258    /// # #[cfg(feature = "deploy")] {
2259    /// # use hydro_lang::prelude::*;
2260    /// # use futures::StreamExt;
2261    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2262    /// let tick = process.tick();
2263    /// let numbers = process
2264    ///     .source_iter(q!(vec![(1, 3), (2, 1), (1, 1), (2, 2)]))
2265    ///     .into_keyed();
2266    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2267    /// batch.sort().all_ticks()
2268    /// # .entries()
2269    /// # }, |mut stream| async move {
2270    /// // values sorted within each key: { 1: [1, 3], 2: [1, 2] }
2271    /// # let mut results = Vec::new();
2272    /// # for _ in 0..4 {
2273    /// #     results.push(stream.next().await.unwrap());
2274    /// # }
2275    /// # let key1_vals: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
2276    /// # let key2_vals: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
2277    /// # assert_eq!(key1_vals, vec![1, 3]);
2278    /// # assert_eq!(key2_vals, vec![1, 2]);
2279    /// # }));
2280    /// # }
2281    /// ```
2282    pub fn sort(self) -> KeyedStream<K, V, L, Bounded, TotalOrder, R>
2283    where
2284        B: IsBounded,
2285        K: Ord,
2286        V: Ord,
2287    {
2288        self.entries().sort().into_keyed()
2289    }
2290
2291    /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
2292    /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
2293    /// is only present in one of the inputs, its values are passed through as-is). The output has
2294    /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
2295    ///
2296    /// Currently, both input streams must be [`Bounded`]. This operator will block
2297    /// on the first stream until all its elements are available. In a future version,
2298    /// we will relax the requirement on the `other` stream.
2299    ///
2300    /// # Example
2301    /// ```rust
2302    /// # #[cfg(feature = "deploy")] {
2303    /// # use hydro_lang::prelude::*;
2304    /// # use futures::StreamExt;
2305    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2306    /// let tick = process.tick();
2307    /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
2308    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2309    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2310    /// # .entries()
2311    /// # }, |mut stream| async move {
2312    /// // { 0: [2, 1], 1: [4, 3] }
2313    /// # let mut results = Vec::new();
2314    /// # for _ in 0..4 {
2315    /// #     results.push(stream.next().await.unwrap());
2316    /// # }
2317    /// # results.sort();
2318    /// # assert_eq!(results, vec![(0, 1), (0, 2), (1, 3), (1, 4)]);
2319    /// # }));
2320    /// # }
2321    /// ```
2322    pub fn chain<O2: Ordering, R2: Retries>(
2323        self,
2324        other: KeyedStream<K, V, L, Bounded, O2, R2>,
2325    ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2326    where
2327        B: IsBounded,
2328        O: MinOrder<O2>,
2329        R: MinRetries<R2>,
2330    {
2331        let this = self.make_bounded();
2332        check_matching_location(&this.location, &other.location);
2333
2334        KeyedStream::new(
2335            this.location.clone(),
2336            HydroNode::Chain {
2337                first: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2338                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2339                metadata: this.location.new_node_metadata(KeyedStream::<
2340                    K,
2341                    V,
2342                    L,
2343                    Bounded,
2344                    <O as MinOrder<O2>>::Min,
2345                    <R as MinRetries<R2>>::Min,
2346                >::collection_kind()),
2347            },
2348        )
2349    }
2350
2351    /// Emit a keyed stream containing keys shared between the keyed stream and the
2352    /// keyed singleton, where each value in the output keyed stream is a tuple of
2353    /// (the keyed stream's value, the keyed singleton's value).
2354    ///
2355    /// # Example
2356    /// ```rust
2357    /// # #[cfg(feature = "deploy")] {
2358    /// # use hydro_lang::prelude::*;
2359    /// # use futures::StreamExt;
2360    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2361    /// let tick = process.tick();
2362    /// let keyed_data = process
2363    ///     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2364    ///     .into_keyed()
2365    ///     .batch(&tick, nondet!(/** test */));
2366    /// let singleton_data = process
2367    ///     .source_iter(q!(vec![(1, 100), (2, 200)]))
2368    ///     .into_keyed()
2369    ///     .batch(&tick, nondet!(/** test */))
2370    ///     .first();
2371    /// keyed_data.join_keyed_singleton(singleton_data).entries().all_ticks()
2372    /// # }, |mut stream| async move {
2373    /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200)] } in any order
2374    /// # let mut results = vec![];
2375    /// # for _ in 0..3 {
2376    /// #     results.push(stream.next().await.unwrap());
2377    /// # }
2378    /// # results.sort();
2379    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200))]);
2380    /// # }));
2381    /// # }
2382    /// ```
2383    pub fn join_keyed_singleton<V2: Clone, B2: IsBounded>(
2384        self,
2385        other: KeyedSingleton<K, V2, L, B2>,
2386    ) -> KeyedStream<K, (V, V2), L, B, O, R>
2387    where
2388        K: Eq + Hash + Clone,
2389        V: Clone,
2390    {
2391        let ir_node = if B2::BOUNDED {
2392            HydroNode::JoinHalf {
2393                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2394                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2395                metadata: self
2396                    .location
2397                    .new_node_metadata(KeyedStream::<K, (V, V2), L, B, O, R>::collection_kind()),
2398            }
2399        } else {
2400            HydroNode::Join {
2401                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2402                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2403                metadata: self
2404                    .location
2405                    .new_node_metadata(KeyedStream::<K, (V, V2), L, B, O, R>::collection_kind()),
2406            }
2407        };
2408
2409        KeyedStream::new(self.location.clone(), ir_node)
2410    }
2411
2412    /// Gets the values associated with a specific key from the keyed stream.
2413    /// Returns an empty stream if the key is `None` or there are no associated values.
2414    ///
2415    /// # Example
2416    /// ```rust
2417    /// # #[cfg(feature = "deploy")] {
2418    /// # use hydro_lang::prelude::*;
2419    /// # use futures::StreamExt;
2420    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2421    /// let tick = process.tick();
2422    /// let keyed_data = process
2423    ///     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2424    ///     .into_keyed()
2425    ///     .batch(&tick, nondet!(/** test */));
2426    /// let key = tick.singleton(q!(1));
2427    /// keyed_data.get(key).all_ticks()
2428    /// # }, |mut stream| async move {
2429    /// // 10, 11
2430    /// # let mut results = vec![];
2431    /// # for _ in 0..2 {
2432    /// #     results.push(stream.next().await.unwrap());
2433    /// # }
2434    /// # results.sort();
2435    /// # assert_eq!(results, vec![10, 11]);
2436    /// # }));
2437    /// # }
2438    /// ```
2439    pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Stream<V, L, B, O, R>
2440    where
2441        K: Eq + Hash + Clone,
2442        V: Clone,
2443    {
2444        let joined =
2445            self.join_keyed_singleton(key.into().map(q!(|k| (k, ()))).into_keyed_singleton());
2446
2447        if O::ORDERING_KIND == StreamOrder::TotalOrder {
2448            joined
2449                .use_ordering_type::<TotalOrder>()
2450                .cast_at_most_one_key()
2451                .map(q!(|(_, (v, _))| v))
2452                .weaken_ordering()
2453        } else {
2454            joined.values().map(q!(|(v, _)| v)).use_ordering_type()
2455        }
2456    }
2457
2458    /// For each value in `self`, find the matching key in `lookup`.
2459    /// The output is a keyed stream with the key from `self`, and a value
2460    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2461    /// If the key is not present in `lookup`, the option will be [`None`].
2462    ///
2463    /// # Example
2464    /// ```rust
2465    /// # #[cfg(feature = "deploy")] {
2466    /// # use hydro_lang::prelude::*;
2467    /// # use futures::StreamExt;
2468    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2469    /// # let tick = process.tick();
2470    /// let requests = // { 1: [10, 11], 2: 20 }
2471    /// # process
2472    /// #     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2473    /// #     .into_keyed()
2474    /// #     .batch(&tick, nondet!(/** test */));
2475    /// let other_data = // { 10: 100, 11: 110 }
2476    /// # process
2477    /// #     .source_iter(q!(vec![(10, 100), (11, 110)]))
2478    /// #     .into_keyed()
2479    /// #     .batch(&tick, nondet!(/** test */))
2480    /// #     .first();
2481    /// requests.lookup_keyed_singleton(other_data)
2482    /// # .entries().all_ticks()
2483    /// # }, |mut stream| async move {
2484    /// // { 1: [(10, Some(100)), (11, Some(110))], 2: (20, None) }
2485    /// # let mut results = vec![];
2486    /// # for _ in 0..3 {
2487    /// #     results.push(stream.next().await.unwrap());
2488    /// # }
2489    /// # results.sort();
2490    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (11, Some(110))), (2, (20, None))]);
2491    /// # }));
2492    /// # }
2493    /// ```
2494    pub fn lookup_keyed_singleton<V2>(
2495        self,
2496        lookup: KeyedSingleton<V, V2, L, Bounded>,
2497    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
2498    where
2499        B: IsBounded,
2500        K: Eq + Hash + Clone,
2501        V: Eq + Hash + Clone,
2502        V2: Clone,
2503    {
2504        self.lookup_keyed_stream(lookup.into_keyed_stream().weaken_retries::<R>())
2505    }
2506
2507    /// For each value in `self`, find the matching key in `lookup`.
2508    /// The output is a keyed stream with the key from `self`, and a value
2509    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2510    /// If the key is not present in `lookup`, the option will be [`None`].
2511    ///
2512    /// # Example
2513    /// ```rust
2514    /// # #[cfg(feature = "deploy")] {
2515    /// # use hydro_lang::prelude::*;
2516    /// # use futures::StreamExt;
2517    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2518    /// # let tick = process.tick();
2519    /// let requests = // { 1: [10, 11], 2: 20 }
2520    /// # process
2521    /// #     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2522    /// #     .into_keyed()
2523    /// #     .batch(&tick, nondet!(/** test */));
2524    /// let other_data = // { 10: [100, 101], 11: 110 }
2525    /// # process
2526    /// #     .source_iter(q!(vec![(10, 100), (10, 101), (11, 110)]))
2527    /// #     .into_keyed()
2528    /// #     .batch(&tick, nondet!(/** test */));
2529    /// requests.lookup_keyed_stream(other_data)
2530    /// # .entries().all_ticks()
2531    /// # }, |mut stream| async move {
2532    /// // { 1: [(10, Some(100)), (10, Some(101)), (11, Some(110))], 2: (20, None) }
2533    /// # let mut results = vec![];
2534    /// # for _ in 0..4 {
2535    /// #     results.push(stream.next().await.unwrap());
2536    /// # }
2537    /// # results.sort();
2538    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(101))), (1, (11, Some(110))), (2, (20, None))]);
2539    /// # }));
2540    /// # }
2541    /// ```
2542    #[expect(clippy::type_complexity, reason = "retries propagation")]
2543    pub fn lookup_keyed_stream<V2, O2: Ordering, R2: Retries>(
2544        self,
2545        lookup: KeyedStream<V, V2, L, Bounded, O2, R2>,
2546    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, <R as MinRetries<R2>>::Min>
2547    where
2548        B: IsBounded,
2549        K: Eq + Hash + Clone,
2550        V: Eq + Hash + Clone,
2551        V2: Clone,
2552        R: MinRetries<R2>,
2553    {
2554        let inverted = self
2555            .make_bounded()
2556            .entries()
2557            .map(q!(|(key, lookup_value)| (lookup_value, key)))
2558            .into_keyed();
2559        let found = inverted
2560            .clone()
2561            .join_keyed_stream(lookup.clone())
2562            .entries()
2563            .map(q!(|(lookup_value, (key, value))| (
2564                key,
2565                (lookup_value, Some(value))
2566            )))
2567            .into_keyed();
2568        let not_found = inverted
2569            .filter_key_not_in(lookup.keys())
2570            .entries()
2571            .map(q!(|(lookup_value, key)| (key, (lookup_value, None))))
2572            .into_keyed();
2573
2574        found.chain(not_found.weaken_retries::<<R as MinRetries<R2>>::Min>())
2575    }
2576
2577    /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
2578    /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
2579    ///
2580    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2581    /// processed before an acknowledgement is emitted.
2582    pub fn atomic(self) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
2583        let id = self.location.flow_state().borrow_mut().next_clock_id();
2584        let out_location = Atomic {
2585            tick: Tick {
2586                id,
2587                l: self.location.clone(),
2588            },
2589        };
2590        KeyedStream::new(
2591            out_location.clone(),
2592            HydroNode::BeginAtomic {
2593                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2594                metadata: out_location
2595                    .new_node_metadata(KeyedStream::<K, V, Atomic<L>, B, O, R>::collection_kind()),
2596            },
2597        )
2598    }
2599
2600    /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
2601    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2602    /// the order of the input.
2603    ///
2604    /// # Non-Determinism
2605    /// The batch boundaries are non-deterministic and may change across executions.
2606    pub fn batch<L2: Location<'a, DropConsistency = L::DropConsistency>>(
2607        self,
2608        tick: &Tick<L2>,
2609        nondet: NonDet,
2610    ) -> KeyedStream<K, V, Tick<L::DropConsistency>, Bounded, O, R> {
2611        let _ = nondet;
2612        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2613        KeyedStream::new(
2614            tick.drop_consistency(),
2615            HydroNode::Batch {
2616                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2617                metadata: tick.new_node_metadata(
2618                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
2619                ),
2620            },
2621        )
2622    }
2623}
2624
2625impl<'a, K1, K2, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2626    KeyedStream<(K1, K2), V, L, B, O, R>
2627{
2628    /// Produces a new keyed stream by dropping the first element of the compound key.
2629    ///
2630    /// Because multiple keys may share the same suffix, this operation results in re-grouping
2631    /// of the values under the new keys. The values across groups with the same new key
2632    /// will be interleaved, so the resulting stream has [`NoOrder`] within each group.
2633    ///
2634    /// # Example
2635    /// ```rust
2636    /// # #[cfg(feature = "deploy")] {
2637    /// # use hydro_lang::prelude::*;
2638    /// # use futures::StreamExt;
2639    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2640    /// process
2641    ///     .source_iter(q!(vec![((1, 10), 2), ((1, 10), 3), ((2, 20), 4)]))
2642    ///     .into_keyed()
2643    ///     .drop_key_prefix()
2644    /// #   .entries()
2645    /// # }, |mut stream| async move {
2646    /// // { 10: [2, 3], 20: [4] }
2647    /// # let mut results = Vec::new();
2648    /// # for _ in 0..3 {
2649    /// #     results.push(stream.next().await.unwrap());
2650    /// # }
2651    /// # results.sort();
2652    /// # assert_eq!(results, vec![(10, 2), (10, 3), (20, 4)]);
2653    /// # }));
2654    /// # }
2655    /// ```
2656    pub fn drop_key_prefix(self) -> KeyedStream<K2, V, L, B, NoOrder, R> {
2657        self.entries()
2658            .map(q!(|((_k1, k2), v)| (k2, v)))
2659            .into_keyed()
2660    }
2661}
2662
2663impl<'a, K, V, L: Location<'a>, O: Ordering, R: Retries> KeyedStream<K, V, L, Unbounded, O, R> {
2664    /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
2665    /// of any overlapping groups. The result has [`NoOrder`] on each group because the
2666    /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
2667    /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
2668    ///
2669    /// Currently, both input streams must be [`Unbounded`].
2670    ///
2671    /// # Example
2672    /// ```rust
2673    /// # #[cfg(feature = "deploy")] {
2674    /// # use hydro_lang::prelude::*;
2675    /// # use futures::StreamExt;
2676    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2677    /// let numbers1: KeyedStream<i32, i32, _> = // { 1: [2], 3: [4] }
2678    /// # process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed().into();
2679    /// let numbers2: KeyedStream<i32, i32, _> = // { 1: [3], 3: [5] }
2680    /// # process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed().into();
2681    /// numbers1.merge_unordered(numbers2)
2682    /// #   .entries()
2683    /// # }, |mut stream| async move {
2684    /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
2685    /// # let mut results = Vec::new();
2686    /// # for _ in 0..4 {
2687    /// #     results.push(stream.next().await.unwrap());
2688    /// # }
2689    /// # results.sort();
2690    /// # assert_eq!(results, vec![(1, 2), (1, 3), (3, 4), (3, 5)]);
2691    /// # }));
2692    /// # }
2693    /// ```
2694    pub fn merge_unordered<O2: Ordering, R2: Retries>(
2695        self,
2696        other: KeyedStream<K, V, L, Unbounded, O2, R2>,
2697    ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2698    where
2699        R: MinRetries<R2>,
2700    {
2701        KeyedStream::new(
2702            self.location.clone(),
2703            HydroNode::Chain {
2704                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2705                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2706                metadata: self.location.new_node_metadata(KeyedStream::<
2707                    K,
2708                    V,
2709                    L,
2710                    Unbounded,
2711                    NoOrder,
2712                    <R as MinRetries<R2>>::Min,
2713                >::collection_kind()),
2714            },
2715        )
2716    }
2717
2718    /// Deprecated: use [`KeyedStream::merge_unordered`] instead.
2719    #[deprecated(note = "use `merge_unordered` instead")]
2720    pub fn interleave<O2: Ordering, R2: Retries>(
2721        self,
2722        other: KeyedStream<K, V, L, Unbounded, O2, R2>,
2723    ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2724    where
2725        R: MinRetries<R2>,
2726    {
2727        self.merge_unordered(other)
2728    }
2729}
2730
2731impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
2732where
2733    L: Location<'a>,
2734{
2735    /// Returns a keyed stream corresponding to the latest batch of elements being atomically
2736    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2737    /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
2738    /// used to create the atomic section.
2739    ///
2740    /// # Non-Determinism
2741    /// The batch boundaries are non-deterministic and may change across executions.
2742    pub fn batch_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
2743        self,
2744        tick: &Tick<L2>,
2745        nondet: NonDet,
2746    ) -> KeyedStream<K, V, Tick<L::DropConsistency>, Bounded, O, R> {
2747        let _ = nondet;
2748        KeyedStream::new(
2749            tick.drop_consistency(),
2750            HydroNode::Batch {
2751                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2752                metadata: tick.new_node_metadata(
2753                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
2754                ),
2755            },
2756        )
2757    }
2758
2759    /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
2760    /// See [`KeyedStream::atomic`] for more details.
2761    pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
2762        KeyedStream::new(
2763            self.location.tick.l.clone(),
2764            HydroNode::EndAtomic {
2765                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2766                metadata: self
2767                    .location
2768                    .tick
2769                    .l
2770                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2771            },
2772        )
2773    }
2774}
2775
2776impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
2777where
2778    L: Location<'a>,
2779{
2780    /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2781    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2782    /// each key.
2783    pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
2784        KeyedStream::new(
2785            self.location.outer().clone(),
2786            HydroNode::YieldConcat {
2787                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2788                metadata: self.location.outer().new_node_metadata(KeyedStream::<
2789                    K,
2790                    V,
2791                    L,
2792                    Unbounded,
2793                    O,
2794                    R,
2795                >::collection_kind(
2796                )),
2797            },
2798        )
2799    }
2800
2801    /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2802    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2803    /// each key.
2804    ///
2805    /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
2806    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2807    /// stream's [`Tick`] context.
2808    pub fn all_ticks_atomic(self) -> KeyedStream<K, V, Atomic<L>, Unbounded, O, R> {
2809        let out_location = Atomic {
2810            tick: self.location.clone(),
2811        };
2812
2813        KeyedStream::new(
2814            out_location.clone(),
2815            HydroNode::YieldConcat {
2816                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2817                metadata: out_location.new_node_metadata(KeyedStream::<
2818                    K,
2819                    V,
2820                    Atomic<L>,
2821                    Unbounded,
2822                    O,
2823                    R,
2824                >::collection_kind()),
2825            },
2826        )
2827    }
2828
2829    /// Transforms the keyed stream using the given closure in "stateful" mode, where stateful operators
2830    /// such as `fold` retrain their memory for each key across ticks rather than resetting across batches of each key.
2831    ///
2832    /// This API is particularly useful for stateful computation on batches of data, such as
2833    /// maintaining an accumulated state that is up to date with the current batch.
2834    ///
2835    /// # Example
2836    /// ```rust
2837    /// # #[cfg(feature = "deploy")] {
2838    /// # use hydro_lang::prelude::*;
2839    /// # use futures::StreamExt;
2840    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2841    /// let tick = process.tick();
2842    /// # // ticks are lazy by default, forces the second tick to run
2843    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2844    /// # let batch_first_tick = process
2845    /// #   .source_iter(q!(vec![(0, 1), (1, 2), (2, 3), (3, 4)]))
2846    /// #   .into_keyed()
2847    /// #   .batch(&tick, nondet!(/** test */));
2848    /// # let batch_second_tick = process
2849    /// #   .source_iter(q!(vec![(0, 5), (1, 6), (2, 7)]))
2850    /// #   .into_keyed()
2851    /// #   .batch(&tick, nondet!(/** test */))
2852    /// #   .defer_tick(); // appears on the second tick
2853    /// let input = batch_first_tick.chain(batch_second_tick).all_ticks();
2854    ///
2855    /// input.batch(&tick, nondet!(/** test */))
2856    ///     .across_ticks(|s| s.reduce(q!(|sum, new| {
2857    ///         *sum += new;
2858    ///     }))).entries().all_ticks()
2859    /// # }, |mut stream| async move {
2860    /// // First tick: [(0, 1), (1, 2), (2, 3), (3, 4)]
2861    /// # let mut results = Vec::new();
2862    /// # for _ in 0..4 {
2863    /// #     results.push(stream.next().await.unwrap());
2864    /// # }
2865    /// # results.sort();
2866    /// # assert_eq!(results, vec![(0, 1), (1, 2), (2, 3), (3, 4)]);
2867    /// // Second tick: [(0, 6), (1, 8), (2, 10), (3, 4)]
2868    /// # results.clear();
2869    /// # for _ in 0..4 {
2870    /// #     results.push(stream.next().await.unwrap());
2871    /// # }
2872    /// # results.sort();
2873    /// # assert_eq!(results, vec![(0, 6), (1, 8), (2, 10), (3, 4)]);
2874    /// # }));
2875    /// # }
2876    /// ```
2877    pub fn across_ticks<Out: BatchAtomic<'a>>(
2878        self,
2879        thunk: impl FnOnce(KeyedStream<K, V, Atomic<L>, Unbounded, O, R>) -> Out,
2880    ) -> Out::Batched {
2881        thunk(self.all_ticks_atomic()).batched_atomic()
2882    }
2883
2884    /// Shifts the entries in `self` to the **next tick**, so that the returned keyed stream at
2885    /// tick `T` always has the entries of `self` at tick `T - 1`.
2886    ///
2887    /// At tick `0`, the output keyed stream is empty, since there is no previous tick.
2888    ///
2889    /// This operator enables stateful iterative processing with ticks, by sending data from one
2890    /// tick to the next. For example, you can use it to combine inputs across consecutive batches.
2891    ///
2892    /// # Example
2893    /// ```rust
2894    /// # #[cfg(feature = "deploy")] {
2895    /// # use hydro_lang::prelude::*;
2896    /// # use futures::StreamExt;
2897    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2898    /// let tick = process.tick();
2899    /// # // ticks are lazy by default, forces the second tick to run
2900    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2901    /// # let batch_first_tick = process
2902    /// #   .source_iter(q!(vec![(1, 2), (1, 3)]))
2903    /// #   .batch(&tick, nondet!(/** test */))
2904    /// #   .into_keyed();
2905    /// # let batch_second_tick = process
2906    /// #   .source_iter(q!(vec![(1, 4), (2, 5)]))
2907    /// #   .batch(&tick, nondet!(/** test */))
2908    /// #   .defer_tick()
2909    /// #   .into_keyed(); // appears on the second tick
2910    /// let changes_across_ticks = // { 1: [2, 3] } (first tick), { 1: [4], 2: [5] } (second tick)
2911    /// # batch_first_tick.chain(batch_second_tick);
2912    /// changes_across_ticks.clone().defer_tick().chain( // from the previous tick
2913    ///     changes_across_ticks // from the current tick
2914    /// )
2915    /// # .entries().all_ticks()
2916    /// # }, |mut stream| async move {
2917    /// // First tick: { 1: [2, 3] }
2918    /// # let mut results = Vec::new();
2919    /// # for _ in 0..2 {
2920    /// #     results.push(stream.next().await.unwrap());
2921    /// # }
2922    /// # results.sort();
2923    /// # assert_eq!(results, vec![(1, 2), (1, 3)]);
2924    /// // Second tick: { 1: [2, 3, 4], 2: [5] }
2925    /// # results.clear();
2926    /// # for _ in 0..4 {
2927    /// #     results.push(stream.next().await.unwrap());
2928    /// # }
2929    /// # results.sort();
2930    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5)]);
2931    /// // Third tick: { 1: [4], 2: [5] }
2932    /// # results.clear();
2933    /// # for _ in 0..2 {
2934    /// #     results.push(stream.next().await.unwrap());
2935    /// # }
2936    /// # results.sort();
2937    /// # assert_eq!(results, vec![(1, 4), (2, 5)]);
2938    /// # }));
2939    /// # }
2940    /// ```
2941    pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2942        KeyedStream::new(
2943            self.location.clone(),
2944            HydroNode::DeferTick {
2945                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2946                metadata: self.location.new_node_metadata(KeyedStream::<
2947                    K,
2948                    V,
2949                    Tick<L>,
2950                    Bounded,
2951                    O,
2952                    R,
2953                >::collection_kind()),
2954            },
2955        )
2956    }
2957}
2958
2959#[cfg(test)]
2960mod tests {
2961    #[cfg(feature = "deploy")]
2962    use futures::{SinkExt, StreamExt};
2963    #[cfg(feature = "deploy")]
2964    use hydro_deploy::Deployment;
2965    #[cfg(any(feature = "deploy", feature = "sim"))]
2966    use stageleft::q;
2967
2968    #[cfg(any(feature = "deploy", feature = "sim"))]
2969    use crate::compile::builder::FlowBuilder;
2970    #[cfg(feature = "deploy")]
2971    use crate::live_collections::stream::ExactlyOnce;
2972    #[cfg(feature = "sim")]
2973    use crate::live_collections::stream::{NoOrder, TotalOrder};
2974    #[cfg(any(feature = "deploy", feature = "sim"))]
2975    use crate::location::Location;
2976    #[cfg(feature = "sim")]
2977    use crate::networking::TCP;
2978    #[cfg(any(feature = "deploy", feature = "sim"))]
2979    use crate::nondet::nondet;
2980    #[cfg(feature = "deploy")]
2981    use crate::properties::manual_proof;
2982
2983    #[cfg(feature = "deploy")]
2984    #[tokio::test]
2985    async fn get_unbounded_keyed_stream_bounded_singleton() {
2986        let mut deployment = Deployment::new();
2987
2988        let mut flow = FlowBuilder::new();
2989        let node = flow.process::<()>();
2990        let external = flow.external::<()>();
2991
2992        let (input_send, input_stream) =
2993            node.source_external_bincode::<_, (i32, i32), _, ExactlyOnce>(&external);
2994
2995        let key = node.singleton(q!(1));
2996
2997        let out = input_stream
2998            .into_keyed()
2999            .get(key)
3000            .send_bincode_external(&external);
3001
3002        let nodes = flow
3003            .with_process(&node, deployment.Localhost())
3004            .with_external(&external, deployment.Localhost())
3005            .deploy(&mut deployment);
3006
3007        deployment.deploy().await.unwrap();
3008
3009        let mut input_send = nodes.connect(input_send).await;
3010        let mut out = nodes.connect(out).await;
3011
3012        deployment.start().await.unwrap();
3013
3014        // First batch
3015        input_send.send((1, 10)).await.unwrap();
3016        input_send.send((2, 20)).await.unwrap();
3017        assert_eq!(out.next().await.unwrap(), 10);
3018
3019        // Second batch
3020        input_send.send((1, 11)).await.unwrap();
3021        input_send.send((2, 21)).await.unwrap();
3022        assert_eq!(out.next().await.unwrap(), 11);
3023    }
3024
3025    #[cfg(feature = "deploy")]
3026    #[tokio::test]
3027    async fn reduce_watermark_filter() {
3028        let mut deployment = Deployment::new();
3029
3030        let mut flow = FlowBuilder::new();
3031        let node = flow.process::<()>();
3032        let external = flow.external::<()>();
3033
3034        let node_tick = node.tick();
3035        let watermark = node_tick.singleton(q!(2));
3036
3037        let sum = node
3038            .source_stream(q!(tokio_stream::iter([
3039                (0, 100),
3040                (1, 101),
3041                (2, 102),
3042                (2, 102)
3043            ])))
3044            .into_keyed()
3045            .reduce_watermark(
3046                watermark,
3047                q!(|acc, v| {
3048                    *acc += v;
3049                }),
3050            )
3051            .snapshot(&node_tick, nondet!(/** test */))
3052            .entries()
3053            .all_ticks()
3054            .send_bincode_external(&external);
3055
3056        let nodes = flow
3057            .with_process(&node, deployment.Localhost())
3058            .with_external(&external, deployment.Localhost())
3059            .deploy(&mut deployment);
3060
3061        deployment.deploy().await.unwrap();
3062
3063        let mut out = nodes.connect(sum).await;
3064
3065        deployment.start().await.unwrap();
3066
3067        assert_eq!(out.next().await.unwrap(), (2, 204));
3068    }
3069
3070    #[cfg(feature = "deploy")]
3071    #[tokio::test]
3072    async fn reduce_watermark_bounded() {
3073        let mut deployment = Deployment::new();
3074
3075        let mut flow = FlowBuilder::new();
3076        let node = flow.process::<()>();
3077        let external = flow.external::<()>();
3078
3079        let node_tick = node.tick();
3080        let watermark = node_tick.singleton(q!(2));
3081
3082        let sum = node
3083            .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
3084            .into_keyed()
3085            .reduce_watermark(
3086                watermark,
3087                q!(|acc, v| {
3088                    *acc += v;
3089                }),
3090            )
3091            .entries()
3092            .send_bincode_external(&external);
3093
3094        let nodes = flow
3095            .with_process(&node, deployment.Localhost())
3096            .with_external(&external, deployment.Localhost())
3097            .deploy(&mut deployment);
3098
3099        deployment.deploy().await.unwrap();
3100
3101        let mut out = nodes.connect(sum).await;
3102
3103        deployment.start().await.unwrap();
3104
3105        assert_eq!(out.next().await.unwrap(), (2, 204));
3106    }
3107
3108    #[cfg(feature = "deploy")]
3109    #[tokio::test]
3110    async fn reduce_watermark_garbage_collect() {
3111        let mut deployment = Deployment::new();
3112
3113        let mut flow = FlowBuilder::new();
3114        let node = flow.process::<()>();
3115        let external = flow.external::<()>();
3116        let (tick_send, tick_trigger) =
3117            node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
3118
3119        let node_tick = node.tick();
3120        let (watermark_complete_cycle, watermark) =
3121            node_tick.cycle_with_initial(node_tick.singleton(q!(2)));
3122        let next_watermark = watermark.clone().map(q!(|v| v + 1));
3123        watermark_complete_cycle.complete_next_tick(next_watermark);
3124
3125        let tick_triggered_input = node_tick
3126            .singleton(q!((3, 103)))
3127            .into_stream()
3128            .filter_if(
3129                tick_trigger
3130                    .clone()
3131                    .batch(&node_tick, nondet!(/** test */))
3132                    .first()
3133                    .is_some(),
3134            )
3135            .all_ticks();
3136
3137        let sum = node
3138            .source_stream(q!(tokio_stream::iter([
3139                (0, 100),
3140                (1, 101),
3141                (2, 102),
3142                (2, 102)
3143            ])))
3144            .merge_unordered(tick_triggered_input)
3145            .into_keyed()
3146            .reduce_watermark(
3147                watermark,
3148                q!(
3149                    |acc, v| {
3150                        *acc += v;
3151                    },
3152                    commutative = manual_proof!(/** integer addition is commutative */)
3153                ),
3154            )
3155            .snapshot(&node_tick, nondet!(/** test */))
3156            .entries()
3157            .all_ticks()
3158            .send_bincode_external(&external);
3159
3160        let nodes = flow
3161            .with_default_optimize()
3162            .with_process(&node, deployment.Localhost())
3163            .with_external(&external, deployment.Localhost())
3164            .deploy(&mut deployment);
3165
3166        deployment.deploy().await.unwrap();
3167
3168        let mut tick_send = nodes.connect(tick_send).await;
3169        let mut out_recv = nodes.connect(sum).await;
3170
3171        deployment.start().await.unwrap();
3172
3173        assert_eq!(out_recv.next().await.unwrap(), (2, 204));
3174
3175        tick_send.send(()).await.unwrap();
3176
3177        assert_eq!(out_recv.next().await.unwrap(), (3, 103));
3178    }
3179
3180    #[cfg(feature = "sim")]
3181    #[test]
3182    #[should_panic]
3183    fn sim_batch_nondet_size() {
3184        let mut flow = FlowBuilder::new();
3185        let node = flow.process::<()>();
3186
3187        let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
3188
3189        let tick = node.tick();
3190        let out_recv = input
3191            .batch(&tick, nondet!(/** test */))
3192            .fold(q!(|| vec![]), q!(|acc, v| acc.push(v)))
3193            .entries()
3194            .all_ticks()
3195            .sim_output();
3196
3197        flow.sim().exhaustive(async || {
3198            out_recv
3199                .assert_yields_only_unordered([(1, vec![1, 2])])
3200                .await;
3201        });
3202    }
3203
3204    #[cfg(feature = "sim")]
3205    #[test]
3206    fn sim_batch_preserves_group_order() {
3207        let mut flow = FlowBuilder::new();
3208        let node = flow.process::<()>();
3209
3210        let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
3211
3212        let tick = node.tick();
3213        let out_recv = input
3214            .batch(&tick, nondet!(/** test */))
3215            .all_ticks()
3216            .fold_early_stop(
3217                q!(|| 0),
3218                q!(|acc, v| {
3219                    *acc = std::cmp::max(v, *acc);
3220                    *acc >= 2
3221                }),
3222            )
3223            .entries()
3224            .sim_output();
3225
3226        let instances = flow.sim().exhaustive(async || {
3227            out_recv
3228                .assert_yields_only_unordered([(1, 2), (2, 3)])
3229                .await;
3230        });
3231
3232        assert_eq!(instances, 8);
3233        // - three cases: all three in a separate tick (pick where (2, 3) is)
3234        // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after
3235        // - two cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them
3236        // - one case: all three together
3237    }
3238
3239    #[cfg(feature = "sim")]
3240    #[test]
3241    fn sim_batch_unordered_shuffles() {
3242        let mut flow = FlowBuilder::new();
3243        let node = flow.process::<()>();
3244
3245        let input = node
3246            .source_iter(q!([(1, 1), (1, 2), (2, 3)]))
3247            .into_keyed()
3248            .weaken_ordering::<NoOrder>();
3249
3250        let tick = node.tick();
3251        let out_recv = input
3252            .batch(&tick, nondet!(/** test */))
3253            .all_ticks()
3254            .entries()
3255            .sim_output();
3256
3257        let instances = flow.sim().exhaustive(async || {
3258            out_recv
3259                .assert_yields_only_unordered([(1, 1), (1, 2), (2, 3)])
3260                .await;
3261        });
3262
3263        assert_eq!(instances, 13);
3264        // - 6 (3 * 2) cases: all three in a separate tick (pick where (2, 3) is), and order of (1, 1), (1, 2)
3265        // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
3266        // - 4 (2 * 2) cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them, and order of (1, 1), (1, 2)
3267        // - one case: all three together (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
3268    }
3269
3270    #[cfg(feature = "sim")]
3271    #[test]
3272    #[should_panic]
3273    fn sim_observe_order_batched() {
3274        let mut flow = FlowBuilder::new();
3275        let node = flow.process::<()>();
3276
3277        let (port, input) = node.sim_input::<_, NoOrder, _>();
3278
3279        let tick = node.tick();
3280        let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
3281        let out_recv = batch
3282            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3283            .all_ticks()
3284            .first()
3285            .entries()
3286            .sim_output();
3287
3288        flow.sim().exhaustive(async || {
3289            port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
3290            out_recv
3291                .assert_yields_only_unordered([(1, 1), (2, 1)])
3292                .await; // fails with assume_ordering
3293        });
3294    }
3295
3296    #[cfg(feature = "sim")]
3297    #[test]
3298    fn sim_observe_order_batched_count() {
3299        let mut flow = FlowBuilder::new();
3300        let node = flow.process::<()>();
3301
3302        let (port, input) = node.sim_input::<_, NoOrder, _>();
3303
3304        let tick = node.tick();
3305        let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
3306        let out_recv = batch
3307            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3308            .all_ticks()
3309            .entries()
3310            .sim_output();
3311
3312        let instance_count = flow.sim().exhaustive(async || {
3313            port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
3314            let _ = out_recv.collect_sorted::<Vec<_>>().await;
3315        });
3316
3317        assert_eq!(instance_count, 104); // too complicated to enumerate here, but less than stream equivalent
3318    }
3319
3320    #[cfg(feature = "sim")]
3321    #[test]
3322    fn sim_top_level_assume_ordering() {
3323        use std::collections::HashMap;
3324
3325        let mut flow = FlowBuilder::new();
3326        let node = flow.process::<()>();
3327
3328        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3329
3330        let out_recv = input
3331            .into_keyed()
3332            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3333            .fold_early_stop(
3334                q!(|| Vec::new()),
3335                q!(|acc, v| {
3336                    acc.push(v);
3337                    acc.len() >= 2
3338                }),
3339            )
3340            .entries()
3341            .sim_output();
3342
3343        let instance_count = flow.sim().exhaustive(async || {
3344            in_send.send_many_unordered([(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd')]);
3345            let out: HashMap<_, _> = out_recv
3346                .collect_sorted::<Vec<_>>()
3347                .await
3348                .into_iter()
3349                .collect();
3350            // Each key accumulates its values; we get one entry per key
3351            assert_eq!(out.len(), 2);
3352        });
3353
3354        assert_eq!(instance_count, 24)
3355    }
3356
3357    #[cfg(feature = "sim")]
3358    #[test]
3359    fn sim_top_level_assume_ordering_cycle_back() {
3360        use std::collections::HashMap;
3361
3362        let mut flow = FlowBuilder::new();
3363        let node = flow.process::<()>();
3364        let node2 = flow.process::<()>();
3365
3366        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3367
3368        let (complete_cycle_back, cycle_back) =
3369            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3370        let ordered = input
3371            .into_keyed()
3372            .merge_unordered(cycle_back)
3373            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3374        complete_cycle_back.complete(
3375            ordered
3376                .clone()
3377                .map(q!(|v| v + 1))
3378                .filter(q!(|v| v % 2 == 1))
3379                .entries()
3380                .send(&node2, TCP.fail_stop().bincode())
3381                .send(&node, TCP.fail_stop().bincode())
3382                .into_keyed(),
3383        );
3384
3385        let out_recv = ordered
3386            .fold_early_stop(
3387                q!(|| Vec::new()),
3388                q!(|acc, v| {
3389                    acc.push(v);
3390                    acc.len() >= 2
3391                }),
3392            )
3393            .entries()
3394            .sim_output();
3395
3396        let mut saw = false;
3397        let instance_count = flow.sim().exhaustive(async || {
3398            // Send (1, 0) and (1, 2). 0+1=1 is odd so cycles back.
3399            // We want to see [0, 1] - the cycled back value interleaved
3400            in_send.send_many_unordered([(1, 0), (1, 2)]);
3401            let out: HashMap<_, _> = out_recv
3402                .collect_sorted::<Vec<_>>()
3403                .await
3404                .into_iter()
3405                .collect();
3406
3407            // We want to see an instance where key 1 gets: 0, then 1 (cycled back from 0+1)
3408            if let Some(values) = out.get(&1)
3409                && *values == vec![0, 1]
3410            {
3411                saw = true;
3412            }
3413        });
3414
3415        assert!(
3416            saw,
3417            "did not see an instance with key 1 having [0, 1] in order"
3418        );
3419        assert_eq!(instance_count, 6);
3420    }
3421
3422    #[cfg(feature = "sim")]
3423    #[test]
3424    fn sim_top_level_assume_ordering_cross_key_cycle() {
3425        use std::collections::HashMap;
3426
3427        // This test demonstrates why releasing one entry at a time is important:
3428        // When one key's observed order cycles back into a different key, we need
3429        // to be able to interleave the cycled-back entry with pending items for
3430        // that other key.
3431        let mut flow = FlowBuilder::new();
3432        let node = flow.process::<()>();
3433        let node2 = flow.process::<()>();
3434
3435        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3436
3437        let (complete_cycle_back, cycle_back) =
3438            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3439        let ordered = input
3440            .into_keyed()
3441            .merge_unordered(cycle_back)
3442            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3443
3444        // Cycle back: when we see (1, 10), emit (2, 100) to key 2
3445        complete_cycle_back.complete(
3446            ordered
3447                .clone()
3448                .filter(q!(|v| *v == 10))
3449                .map(q!(|_| 100))
3450                .entries()
3451                .map(q!(|(_, v)| (2, v))) // Change key from 1 to 2
3452                .send(&node2, TCP.fail_stop().bincode())
3453                .send(&node, TCP.fail_stop().bincode())
3454                .into_keyed(),
3455        );
3456
3457        let out_recv = ordered
3458            .fold_early_stop(
3459                q!(|| Vec::new()),
3460                q!(|acc, v| {
3461                    acc.push(v);
3462                    acc.len() >= 2
3463                }),
3464            )
3465            .entries()
3466            .sim_output();
3467
3468        // We want to see an instance where:
3469        // - (1, 10) is released first
3470        // - This causes (2, 100) to be cycled back
3471        // - (2, 100) is released BEFORE (2, 20) which was already pending
3472        let mut saw_cross_key_interleave = false;
3473        let instance_count = flow.sim().exhaustive(async || {
3474            // Send (1, 10), (1, 11) for key 1, and (2, 20), (2, 21) for key 2
3475            in_send.send_many_unordered([(1, 10), (1, 11), (2, 20), (2, 21)]);
3476            let out: HashMap<_, _> = out_recv
3477                .collect_sorted::<Vec<_>>()
3478                .await
3479                .into_iter()
3480                .collect();
3481
3482            // Check if we see the cross-key interleaving:
3483            // key 2 should have [100, 20] or [100, 21] - cycled back 100 before a pending item
3484            if let Some(values) = out.get(&2)
3485                && values.len() >= 2
3486                && values[0] == 100
3487            {
3488                saw_cross_key_interleave = true;
3489            }
3490        });
3491
3492        assert!(
3493            saw_cross_key_interleave,
3494            "did not see an instance where cycled-back 100 was released before pending items for key 2"
3495        );
3496        assert_eq!(instance_count, 60);
3497    }
3498
3499    #[cfg(feature = "sim")]
3500    #[test]
3501    fn sim_top_level_assume_ordering_cycle_back_tick() {
3502        use std::collections::HashMap;
3503
3504        let mut flow = FlowBuilder::new();
3505        let node = flow.process::<()>();
3506        let node2 = flow.process::<()>();
3507
3508        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3509
3510        let (complete_cycle_back, cycle_back) =
3511            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3512        let ordered = input
3513            .into_keyed()
3514            .merge_unordered(cycle_back)
3515            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3516        complete_cycle_back.complete(
3517            ordered
3518                .clone()
3519                .batch(&node.tick(), nondet!(/** test */))
3520                .all_ticks()
3521                .map(q!(|v| v + 1))
3522                .filter(q!(|v| v % 2 == 1))
3523                .entries()
3524                .send(&node2, TCP.fail_stop().bincode())
3525                .send(&node, TCP.fail_stop().bincode())
3526                .into_keyed(),
3527        );
3528
3529        let out_recv = ordered
3530            .fold_early_stop(
3531                q!(|| Vec::new()),
3532                q!(|acc, v| {
3533                    acc.push(v);
3534                    acc.len() >= 2
3535                }),
3536            )
3537            .entries()
3538            .sim_output();
3539
3540        let mut saw = false;
3541        let instance_count = flow.sim().exhaustive(async || {
3542            in_send.send_many_unordered([(1, 0), (1, 2)]);
3543            let out: HashMap<_, _> = out_recv
3544                .collect_sorted::<Vec<_>>()
3545                .await
3546                .into_iter()
3547                .collect();
3548
3549            if let Some(values) = out.get(&1)
3550                && *values == vec![0, 1]
3551            {
3552                saw = true;
3553            }
3554        });
3555
3556        assert!(
3557            saw,
3558            "did not see an instance with key 1 having [0, 1] in order"
3559        );
3560        assert_eq!(instance_count, 58);
3561    }
3562
3563    #[cfg(feature = "sim")]
3564    #[test]
3565    fn sim_entries_partially_ordered_bounded() {
3566        let mut flow = FlowBuilder::new();
3567        let node = flow.process::<()>();
3568
3569        let (port, input) = node.sim_input::<_, TotalOrder, _>();
3570
3571        let tick = node.tick();
3572        let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
3573        let out_recv = batch
3574            .entries_partially_ordered(nondet!(/** test */))
3575            .all_ticks()
3576            .sim_output();
3577
3578        let instance_count = flow.sim().exhaustive(async || {
3579            port.send((1, 'a'));
3580            port.send((1, 'b'));
3581            port.send((2, 'c'));
3582            let _: Vec<(i32, char)> = out_recv.collect().await;
3583        });
3584
3585        assert_eq!(instance_count, 12);
3586    }
3587
3588    #[cfg(feature = "sim")]
3589    #[test]
3590    fn sim_entries_partially_ordered_top_level() {
3591        let mut flow = FlowBuilder::new();
3592        let node = flow.process::<()>();
3593
3594        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3595
3596        let out_recv = input
3597            .into_keyed()
3598            .entries_partially_ordered(nondet!(/** test */))
3599            .sim_output();
3600
3601        let instance_count = flow.sim().exhaustive(async || {
3602            in_send.send((1, 'a'));
3603            in_send.send((1, 'b'));
3604            in_send.send((2, 'c'));
3605            let _: Vec<(i32, char)> = out_recv.collect().await;
3606        });
3607
3608        assert_eq!(instance_count, 3);
3609    }
3610
3611    #[cfg(feature = "sim")]
3612    #[test]
3613    fn sim_entries_partially_ordered_cycle_back() {
3614        let mut flow = FlowBuilder::new();
3615        let node = flow.process::<()>();
3616        let node2 = flow.process::<()>();
3617
3618        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3619
3620        let (complete_cycle_back, cycle_back) =
3621            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3622        let ordered = input
3623            .into_keyed()
3624            .merge_unordered(cycle_back)
3625            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3626
3627        let flat = ordered
3628            .clone()
3629            .entries_partially_ordered(nondet!(/** test */));
3630
3631        complete_cycle_back.complete(
3632            flat.clone()
3633                .map(q!(|(k, v): (i32, i32)| (k, v + 1)))
3634                .filter(q!(|(_, v)| *v % 2 == 1))
3635                .send(&node2, TCP.fail_stop().bincode())
3636                .send(&node, TCP.fail_stop().bincode())
3637                .into_keyed(),
3638        );
3639
3640        let out_recv = flat.sim_output();
3641
3642        let mut saw = false;
3643        let instance_count = flow.sim().exhaustive(async || {
3644            // Send (1, 0) and (1, 2). 0+1=1 is odd so cycles back as (1, 1).
3645            // We want to see (1, 1) before (1, 2) - the cycled back value beats the pending one
3646            in_send.send_many_unordered([(1, 0), (1, 2)]);
3647            let results: Vec<(i32, i32)> = out_recv.collect().await;
3648
3649            let pos_1 = results.iter().position(|v| *v == (1, 1));
3650            let pos_2 = results.iter().position(|v| *v == (1, 2));
3651            if let (Some(p1), Some(p2)) = (pos_1, pos_2)
3652                && p1 < p2
3653            {
3654                saw = true;
3655            }
3656        });
3657
3658        assert!(saw, "did not see an instance with (1, 1) before (1, 2)");
3659        assert_eq!(instance_count, 78);
3660    }
3661}