hydro_lang/live_collections/keyed_stream/mod.rs
1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
11
12use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
13use super::keyed_singleton::KeyedSingleton;
14use super::optional::Optional;
15use super::singleton::Singleton;
16use super::stream::{
17 ExactlyOnce, IsExactlyOnce, IsOrdered, MinOrder, MinRetries, NoOrder, Stream, TotalOrder,
18};
19use crate::compile::builder::CycleId;
20use crate::compile::ir::{
21 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
22};
23#[cfg(stageleft_runtime)]
24use crate::forward_handle::{CycleCollection, ReceiverComplete};
25use crate::forward_handle::{ForwardRef, TickCycle};
26use crate::live_collections::batch_atomic::BatchAtomic;
27use crate::live_collections::stream::{
28 AtLeastOnce, Ordering, Retries, WeakerOrderingThan, WeakerRetryThan,
29};
30#[cfg(stageleft_runtime)]
31use crate::location::dynamic::{DynLocation, LocationId};
32use crate::location::tick::DeferTick;
33use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
34use crate::manual_expr::ManualExpr;
35use crate::nondet::{NonDet, nondet};
36use crate::properties::{AggFuncAlgebra, ValidCommutativityFor, ValidIdempotenceFor};
37
38pub mod networking;
39
40/// Streaming elements of type `V` grouped by a key of type `K`.
41///
42/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
43/// order of keys is non-deterministic but the order *within* each group may be deterministic.
44///
45/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
46/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
47/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
48///
49/// Type Parameters:
50/// - `K`: the type of the key for each group
51/// - `V`: the type of the elements inside each group
52/// - `Loc`: the [`Location`] where the keyed stream is materialized
53/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
54/// - `Order`: tracks whether the elements within each group have deterministic order
55/// ([`TotalOrder`]) or not ([`NoOrder`])
56/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
57/// ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
58pub struct KeyedStream<
59 K,
60 V,
61 Loc,
62 Bound: Boundedness = Unbounded,
63 Order: Ordering = TotalOrder,
64 Retry: Retries = ExactlyOnce,
65> {
66 pub(crate) location: Loc,
67 pub(crate) ir_node: RefCell<HydroNode>,
68
69 _phantom: PhantomData<(K, V, Loc, Bound, Order, Retry)>,
70}
71
72impl<'a, K, V, L, O: Ordering, R: Retries> From<KeyedStream<K, V, L, Bounded, O, R>>
73 for KeyedStream<K, V, L, Unbounded, O, R>
74where
75 L: Location<'a>,
76{
77 fn from(stream: KeyedStream<K, V, L, Bounded, O, R>) -> KeyedStream<K, V, L, Unbounded, O, R> {
78 let new_meta = stream
79 .location
80 .new_node_metadata(KeyedStream::<K, V, L, Unbounded, O, R>::collection_kind());
81
82 KeyedStream {
83 location: stream.location,
84 ir_node: RefCell::new(HydroNode::Cast {
85 inner: Box::new(stream.ir_node.into_inner()),
86 metadata: new_meta,
87 }),
88 _phantom: PhantomData,
89 }
90 }
91}
92
93impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
94 for KeyedStream<K, V, L, B, NoOrder, R>
95where
96 L: Location<'a>,
97{
98 fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
99 stream.weaken_ordering()
100 }
101}
102
103impl<'a, K, V, L, O: Ordering, R: Retries> DeferTick for KeyedStream<K, V, Tick<L>, Bounded, O, R>
104where
105 L: Location<'a>,
106{
107 fn defer_tick(self) -> Self {
108 KeyedStream::defer_tick(self)
109 }
110}
111
112impl<'a, K, V, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
113 for KeyedStream<K, V, Tick<L>, Bounded, O, R>
114where
115 L: Location<'a>,
116{
117 type Location = Tick<L>;
118
119 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
120 KeyedStream {
121 location: location.clone(),
122 ir_node: RefCell::new(HydroNode::CycleSource {
123 cycle_id,
124 metadata: location.new_node_metadata(
125 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
126 ),
127 }),
128 _phantom: PhantomData,
129 }
130 }
131}
132
133impl<'a, K, V, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
134 for KeyedStream<K, V, Tick<L>, Bounded, O, R>
135where
136 L: Location<'a>,
137{
138 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
139 assert_eq!(
140 Location::id(&self.location),
141 expected_location,
142 "locations do not match"
143 );
144
145 self.location
146 .flow_state()
147 .borrow_mut()
148 .push_root(HydroRoot::CycleSink {
149 cycle_id,
150 input: Box::new(self.ir_node.into_inner()),
151 op_metadata: HydroIrOpMetadata::new(),
152 });
153 }
154}
155
156impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
157 for KeyedStream<K, V, L, B, O, R>
158where
159 L: Location<'a> + NoTick,
160{
161 type Location = L;
162
163 fn create_source(cycle_id: CycleId, location: L) -> Self {
164 KeyedStream {
165 location: location.clone(),
166 ir_node: RefCell::new(HydroNode::CycleSource {
167 cycle_id,
168 metadata: location
169 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
170 }),
171 _phantom: PhantomData,
172 }
173 }
174}
175
176impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
177 for KeyedStream<K, V, L, B, O, R>
178where
179 L: Location<'a> + NoTick,
180{
181 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
182 assert_eq!(
183 Location::id(&self.location),
184 expected_location,
185 "locations do not match"
186 );
187 self.location
188 .flow_state()
189 .borrow_mut()
190 .push_root(HydroRoot::CycleSink {
191 cycle_id,
192 input: Box::new(self.ir_node.into_inner()),
193 op_metadata: HydroIrOpMetadata::new(),
194 });
195 }
196}
197
198impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
199 Clone for KeyedStream<K, V, Loc, Bound, Order, R>
200{
201 fn clone(&self) -> Self {
202 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
203 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
204 *self.ir_node.borrow_mut() = HydroNode::Tee {
205 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
206 metadata: self.location.new_node_metadata(Self::collection_kind()),
207 };
208 }
209
210 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
211 KeyedStream {
212 location: self.location.clone(),
213 ir_node: HydroNode::Tee {
214 inner: TeeNode(inner.0.clone()),
215 metadata: metadata.clone(),
216 }
217 .into(),
218 _phantom: PhantomData,
219 }
220 } else {
221 unreachable!()
222 }
223 }
224}
225
226/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
227/// control the processing of future elements.
228pub enum Generate<T> {
229 /// Emit the provided element, and keep processing future inputs.
230 Yield(T),
231 /// Emit the provided element as the _final_ element, do not process future inputs.
232 Return(T),
233 /// Do not emit anything, but continue processing future inputs.
234 Continue,
235 /// Do not emit anything, and do not process further inputs.
236 Break,
237}
238
239impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
240 KeyedStream<K, V, L, B, O, R>
241{
242 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
243 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
244 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
245
246 KeyedStream {
247 location,
248 ir_node: RefCell::new(ir_node),
249 _phantom: PhantomData,
250 }
251 }
252
253 /// Returns the [`CollectionKind`] corresponding to this type.
254 pub fn collection_kind() -> CollectionKind {
255 CollectionKind::KeyedStream {
256 bound: B::BOUND_KIND,
257 value_order: O::ORDERING_KIND,
258 value_retry: R::RETRIES_KIND,
259 key_type: stageleft::quote_type::<K>().into(),
260 value_type: stageleft::quote_type::<V>().into(),
261 }
262 }
263
264 /// Returns the [`Location`] where this keyed stream is being materialized.
265 pub fn location(&self) -> &L {
266 &self.location
267 }
268
269 /// Explicitly "casts" the keyed stream to a type with a different ordering
270 /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
271 /// by the type-system.
272 ///
273 /// # Non-Determinism
274 /// This function is used as an escape hatch, and any mistakes in the
275 /// provided ordering guarantee will propagate into the guarantees
276 /// for the rest of the program.
277 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
278 if O::ORDERING_KIND == O2::ORDERING_KIND {
279 KeyedStream::new(self.location, self.ir_node.into_inner())
280 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
281 // We can always weaken the ordering guarantee
282 KeyedStream::new(
283 self.location.clone(),
284 HydroNode::Cast {
285 inner: Box::new(self.ir_node.into_inner()),
286 metadata: self
287 .location
288 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
289 },
290 )
291 } else {
292 KeyedStream::new(
293 self.location.clone(),
294 HydroNode::ObserveNonDet {
295 inner: Box::new(self.ir_node.into_inner()),
296 trusted: false,
297 metadata: self
298 .location
299 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
300 },
301 )
302 }
303 }
304
305 fn assume_ordering_trusted<O2: Ordering>(
306 self,
307 _nondet: NonDet,
308 ) -> KeyedStream<K, V, L, B, O2, R> {
309 if O::ORDERING_KIND == O2::ORDERING_KIND {
310 KeyedStream::new(self.location, self.ir_node.into_inner())
311 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
312 // We can always weaken the ordering guarantee
313 KeyedStream::new(
314 self.location.clone(),
315 HydroNode::Cast {
316 inner: Box::new(self.ir_node.into_inner()),
317 metadata: self
318 .location
319 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
320 },
321 )
322 } else {
323 KeyedStream::new(
324 self.location.clone(),
325 HydroNode::ObserveNonDet {
326 inner: Box::new(self.ir_node.into_inner()),
327 trusted: true,
328 metadata: self
329 .location
330 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
331 },
332 )
333 }
334 }
335
336 #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
337 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
338 /// which is always safe because that is the weakest possible guarantee.
339 pub fn weakest_ordering(self) -> KeyedStream<K, V, L, B, NoOrder, R> {
340 self.weaken_ordering::<NoOrder>()
341 }
342
343 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
344 /// enforcing that `O2` is weaker than the input ordering guarantee.
345 pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> KeyedStream<K, V, L, B, O2, R> {
346 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
347 self.assume_ordering::<O2>(nondet)
348 }
349
350 /// Explicitly "casts" the keyed stream to a type with a different retries
351 /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
352 /// be proven by the type-system.
353 ///
354 /// # Non-Determinism
355 /// This function is used as an escape hatch, and any mistakes in the
356 /// provided retries guarantee will propagate into the guarantees
357 /// for the rest of the program.
358 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
359 if R::RETRIES_KIND == R2::RETRIES_KIND {
360 KeyedStream::new(self.location, self.ir_node.into_inner())
361 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
362 // We can always weaken the retries guarantee
363 KeyedStream::new(
364 self.location.clone(),
365 HydroNode::Cast {
366 inner: Box::new(self.ir_node.into_inner()),
367 metadata: self
368 .location
369 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
370 },
371 )
372 } else {
373 KeyedStream::new(
374 self.location.clone(),
375 HydroNode::ObserveNonDet {
376 inner: Box::new(self.ir_node.into_inner()),
377 trusted: false,
378 metadata: self
379 .location
380 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
381 },
382 )
383 }
384 }
385
386 #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
387 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
388 /// which is always safe because that is the weakest possible guarantee.
389 pub fn weakest_retries(self) -> KeyedStream<K, V, L, B, O, AtLeastOnce> {
390 self.weaken_retries::<AtLeastOnce>()
391 }
392
393 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
394 /// enforcing that `R2` is weaker than the input retries guarantee.
395 pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> KeyedStream<K, V, L, B, O, R2> {
396 let nondet = nondet!(/** this is a weaker retries guarantee, so it is safe to assume */);
397 self.assume_retries::<R2>(nondet)
398 }
399
400 /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
401 /// implies that `O == TotalOrder`.
402 pub fn make_totally_ordered(self) -> KeyedStream<K, V, L, B, TotalOrder, R>
403 where
404 O: IsOrdered,
405 {
406 self.assume_ordering(nondet!(/** no-op */))
407 }
408
409 /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
410 /// implies that `R == ExactlyOnce`.
411 pub fn make_exactly_once(self) -> KeyedStream<K, V, L, B, O, ExactlyOnce>
412 where
413 R: IsExactlyOnce,
414 {
415 self.assume_retries(nondet!(/** no-op */))
416 }
417
418 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
419 /// implies that `B == Bounded`.
420 pub fn make_bounded(self) -> KeyedStream<K, V, L, Bounded, O, R>
421 where
422 B: IsBounded,
423 {
424 KeyedStream::new(self.location, self.ir_node.into_inner())
425 }
426
427 /// Flattens the keyed stream into an unordered stream of key-value pairs.
428 ///
429 /// # Example
430 /// ```rust
431 /// # #[cfg(feature = "deploy")] {
432 /// # use hydro_lang::prelude::*;
433 /// # use futures::StreamExt;
434 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
435 /// process
436 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
437 /// .into_keyed()
438 /// .entries()
439 /// # }, |mut stream| async move {
440 /// // (1, 2), (1, 3), (2, 4) in any order
441 /// # let mut results = Vec::new();
442 /// # for _ in 0..3 {
443 /// # results.push(stream.next().await.unwrap());
444 /// # }
445 /// # results.sort();
446 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
447 /// # }));
448 /// # }
449 /// ```
450 pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
451 Stream::new(
452 self.location.clone(),
453 HydroNode::Cast {
454 inner: Box::new(self.ir_node.into_inner()),
455 metadata: self
456 .location
457 .new_node_metadata(Stream::<(K, V), L, B, NoOrder, R>::collection_kind()),
458 },
459 )
460 }
461
462 /// Flattens the keyed stream into an unordered stream of only the values.
463 ///
464 /// # Example
465 /// ```rust
466 /// # #[cfg(feature = "deploy")] {
467 /// # use hydro_lang::prelude::*;
468 /// # use futures::StreamExt;
469 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
470 /// process
471 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
472 /// .into_keyed()
473 /// .values()
474 /// # }, |mut stream| async move {
475 /// // 2, 3, 4 in any order
476 /// # let mut results = Vec::new();
477 /// # for _ in 0..3 {
478 /// # results.push(stream.next().await.unwrap());
479 /// # }
480 /// # results.sort();
481 /// # assert_eq!(results, vec![2, 3, 4]);
482 /// # }));
483 /// # }
484 /// ```
485 pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
486 self.entries().map(q!(|(_, v)| v))
487 }
488
489 /// Flattens the keyed stream into an unordered stream of just the keys.
490 ///
491 /// # Example
492 /// ```rust
493 /// # #[cfg(feature = "deploy")] {
494 /// # use hydro_lang::prelude::*;
495 /// # use futures::StreamExt;
496 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
497 /// # process
498 /// # .source_iter(q!(vec![(1, 2), (2, 4), (1, 5)]))
499 /// # .into_keyed()
500 /// # .keys()
501 /// # }, |mut stream| async move {
502 /// // 1, 2 in any order
503 /// # let mut results = Vec::new();
504 /// # for _ in 0..2 {
505 /// # results.push(stream.next().await.unwrap());
506 /// # }
507 /// # results.sort();
508 /// # assert_eq!(results, vec![1, 2]);
509 /// # }));
510 /// # }
511 /// ```
512 pub fn keys(self) -> Stream<K, L, B, NoOrder, ExactlyOnce>
513 where
514 K: Eq + Hash,
515 {
516 self.entries().map(q!(|(k, _)| k)).unique()
517 }
518
519 /// Transforms each value by invoking `f` on each element, with keys staying the same
520 /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
521 ///
522 /// If you do not want to modify the stream and instead only want to view
523 /// each item use [`KeyedStream::inspect`] instead.
524 ///
525 /// # Example
526 /// ```rust
527 /// # #[cfg(feature = "deploy")] {
528 /// # use hydro_lang::prelude::*;
529 /// # use futures::StreamExt;
530 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
531 /// process
532 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
533 /// .into_keyed()
534 /// .map(q!(|v| v + 1))
535 /// # .entries()
536 /// # }, |mut stream| async move {
537 /// // { 1: [3, 4], 2: [5] }
538 /// # let mut results = Vec::new();
539 /// # for _ in 0..3 {
540 /// # results.push(stream.next().await.unwrap());
541 /// # }
542 /// # results.sort();
543 /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 5)]);
544 /// # }));
545 /// # }
546 /// ```
547 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
548 where
549 F: Fn(V) -> U + 'a,
550 {
551 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
552 let map_f = q!({
553 let orig = f;
554 move |(k, v)| (k, orig(v))
555 })
556 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
557 .into();
558
559 KeyedStream::new(
560 self.location.clone(),
561 HydroNode::Map {
562 f: map_f,
563 input: Box::new(self.ir_node.into_inner()),
564 metadata: self
565 .location
566 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
567 },
568 )
569 }
570
571 /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
572 /// re-grouped even they are tuples; instead they will be grouped under the original key.
573 ///
574 /// If you do not want to modify the stream and instead only want to view
575 /// each item use [`KeyedStream::inspect_with_key`] instead.
576 ///
577 /// # Example
578 /// ```rust
579 /// # #[cfg(feature = "deploy")] {
580 /// # use hydro_lang::prelude::*;
581 /// # use futures::StreamExt;
582 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
583 /// process
584 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
585 /// .into_keyed()
586 /// .map_with_key(q!(|(k, v)| k + v))
587 /// # .entries()
588 /// # }, |mut stream| async move {
589 /// // { 1: [3, 4], 2: [6] }
590 /// # let mut results = Vec::new();
591 /// # for _ in 0..3 {
592 /// # results.push(stream.next().await.unwrap());
593 /// # }
594 /// # results.sort();
595 /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 6)]);
596 /// # }));
597 /// # }
598 /// ```
599 pub fn map_with_key<U, F>(
600 self,
601 f: impl IntoQuotedMut<'a, F, L> + Copy,
602 ) -> KeyedStream<K, U, L, B, O, R>
603 where
604 F: Fn((K, V)) -> U + 'a,
605 K: Clone,
606 {
607 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
608 let map_f = q!({
609 let orig = f;
610 move |(k, v)| {
611 let out = orig((Clone::clone(&k), v));
612 (k, out)
613 }
614 })
615 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
616 .into();
617
618 KeyedStream::new(
619 self.location.clone(),
620 HydroNode::Map {
621 f: map_f,
622 input: Box::new(self.ir_node.into_inner()),
623 metadata: self
624 .location
625 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
626 },
627 )
628 }
629
630 /// Prepends a new value to the key of each element in the stream, producing a new
631 /// keyed stream with compound keys. Because the original key is preserved, no re-grouping
632 /// occurs and the elements in each group preserve their original order.
633 ///
634 /// # Example
635 /// ```rust
636 /// # #[cfg(feature = "deploy")] {
637 /// # use hydro_lang::prelude::*;
638 /// # use futures::StreamExt;
639 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
640 /// process
641 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
642 /// .into_keyed()
643 /// .prefix_key(q!(|&(k, _)| k % 2))
644 /// # .entries()
645 /// # }, |mut stream| async move {
646 /// // { (1, 1): [2, 3], (0, 2): [4] }
647 /// # let mut results = Vec::new();
648 /// # for _ in 0..3 {
649 /// # results.push(stream.next().await.unwrap());
650 /// # }
651 /// # results.sort();
652 /// # assert_eq!(results, vec![((0, 2), 4), ((1, 1), 2), ((1, 1), 3)]);
653 /// # }));
654 /// # }
655 /// ```
656 pub fn prefix_key<K2, F>(
657 self,
658 f: impl IntoQuotedMut<'a, F, L> + Copy,
659 ) -> KeyedStream<(K2, K), V, L, B, O, R>
660 where
661 F: Fn(&(K, V)) -> K2 + 'a,
662 {
663 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
664 let map_f = q!({
665 let orig = f;
666 move |kv| {
667 let out = orig(&kv);
668 ((out, kv.0), kv.1)
669 }
670 })
671 .splice_fn1_ctx::<(K, V), ((K2, K), V)>(&self.location)
672 .into();
673
674 KeyedStream::new(
675 self.location.clone(),
676 HydroNode::Map {
677 f: map_f,
678 input: Box::new(self.ir_node.into_inner()),
679 metadata: self
680 .location
681 .new_node_metadata(KeyedStream::<(K2, K), V, L, B, O, R>::collection_kind()),
682 },
683 )
684 }
685
686 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
687 /// `f`, preserving the order of the elements within the group.
688 ///
689 /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
690 /// not modify or take ownership of the values. If you need to modify the values while filtering
691 /// use [`KeyedStream::filter_map`] instead.
692 ///
693 /// # Example
694 /// ```rust
695 /// # #[cfg(feature = "deploy")] {
696 /// # use hydro_lang::prelude::*;
697 /// # use futures::StreamExt;
698 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
699 /// process
700 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
701 /// .into_keyed()
702 /// .filter(q!(|&x| x > 2))
703 /// # .entries()
704 /// # }, |mut stream| async move {
705 /// // { 1: [3], 2: [4] }
706 /// # let mut results = Vec::new();
707 /// # for _ in 0..2 {
708 /// # results.push(stream.next().await.unwrap());
709 /// # }
710 /// # results.sort();
711 /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
712 /// # }));
713 /// # }
714 /// ```
715 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
716 where
717 F: Fn(&V) -> bool + 'a,
718 {
719 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
720 let filter_f = q!({
721 let orig = f;
722 move |t: &(_, _)| orig(&t.1)
723 })
724 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
725 .into();
726
727 KeyedStream::new(
728 self.location.clone(),
729 HydroNode::Filter {
730 f: filter_f,
731 input: Box::new(self.ir_node.into_inner()),
732 metadata: self.location.new_node_metadata(Self::collection_kind()),
733 },
734 )
735 }
736
737 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
738 /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
739 ///
740 /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
741 /// not modify or take ownership of the values. If you need to modify the values while filtering
742 /// use [`KeyedStream::filter_map_with_key`] instead.
743 ///
744 /// # Example
745 /// ```rust
746 /// # #[cfg(feature = "deploy")] {
747 /// # use hydro_lang::prelude::*;
748 /// # use futures::StreamExt;
749 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
750 /// process
751 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
752 /// .into_keyed()
753 /// .filter_with_key(q!(|&(k, v)| v - k == 2))
754 /// # .entries()
755 /// # }, |mut stream| async move {
756 /// // { 1: [3], 2: [4] }
757 /// # let mut results = Vec::new();
758 /// # for _ in 0..2 {
759 /// # results.push(stream.next().await.unwrap());
760 /// # }
761 /// # results.sort();
762 /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
763 /// # }));
764 /// # }
765 /// ```
766 pub fn filter_with_key<F>(
767 self,
768 f: impl IntoQuotedMut<'a, F, L> + Copy,
769 ) -> KeyedStream<K, V, L, B, O, R>
770 where
771 F: Fn(&(K, V)) -> bool + 'a,
772 {
773 let filter_f = f
774 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
775 .into();
776
777 KeyedStream::new(
778 self.location.clone(),
779 HydroNode::Filter {
780 f: filter_f,
781 input: Box::new(self.ir_node.into_inner()),
782 metadata: self.location.new_node_metadata(Self::collection_kind()),
783 },
784 )
785 }
786
787 /// An operator that both filters and maps each value, with keys staying the same.
788 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
789 /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
790 ///
791 /// # Example
792 /// ```rust
793 /// # #[cfg(feature = "deploy")] {
794 /// # use hydro_lang::prelude::*;
795 /// # use futures::StreamExt;
796 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
797 /// process
798 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
799 /// .into_keyed()
800 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
801 /// # .entries()
802 /// # }, |mut stream| async move {
803 /// // { 1: [2], 2: [4] }
804 /// # let mut results = Vec::new();
805 /// # for _ in 0..2 {
806 /// # results.push(stream.next().await.unwrap());
807 /// # }
808 /// # results.sort();
809 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
810 /// # }));
811 /// # }
812 /// ```
813 pub fn filter_map<U, F>(
814 self,
815 f: impl IntoQuotedMut<'a, F, L> + Copy,
816 ) -> KeyedStream<K, U, L, B, O, R>
817 where
818 F: Fn(V) -> Option<U> + 'a,
819 {
820 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
821 let filter_map_f = q!({
822 let orig = f;
823 move |(k, v)| orig(v).map(|o| (k, o))
824 })
825 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
826 .into();
827
828 KeyedStream::new(
829 self.location.clone(),
830 HydroNode::FilterMap {
831 f: filter_map_f,
832 input: Box::new(self.ir_node.into_inner()),
833 metadata: self
834 .location
835 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
836 },
837 )
838 }
839
840 /// An operator that both filters and maps each key-value pair. The resulting values are **not**
841 /// re-grouped even they are tuples; instead they will be grouped under the original key.
842 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
843 ///
844 /// # Example
845 /// ```rust
846 /// # #[cfg(feature = "deploy")] {
847 /// # use hydro_lang::prelude::*;
848 /// # use futures::StreamExt;
849 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
850 /// process
851 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
852 /// .into_keyed()
853 /// .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
854 /// # .entries()
855 /// # }, |mut stream| async move {
856 /// // { 2: [2] }
857 /// # let mut results = Vec::new();
858 /// # for _ in 0..1 {
859 /// # results.push(stream.next().await.unwrap());
860 /// # }
861 /// # results.sort();
862 /// # assert_eq!(results, vec![(2, 2)]);
863 /// # }));
864 /// # }
865 /// ```
866 pub fn filter_map_with_key<U, F>(
867 self,
868 f: impl IntoQuotedMut<'a, F, L> + Copy,
869 ) -> KeyedStream<K, U, L, B, O, R>
870 where
871 F: Fn((K, V)) -> Option<U> + 'a,
872 K: Clone,
873 {
874 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
875 let filter_map_f = q!({
876 let orig = f;
877 move |(k, v)| {
878 let out = orig((Clone::clone(&k), v));
879 out.map(|o| (k, o))
880 }
881 })
882 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
883 .into();
884
885 KeyedStream::new(
886 self.location.clone(),
887 HydroNode::FilterMap {
888 f: filter_map_f,
889 input: Box::new(self.ir_node.into_inner()),
890 metadata: self
891 .location
892 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
893 },
894 )
895 }
896
897 /// Generates a keyed stream that maps each value `v` to a tuple `(v, x)`,
898 /// where `v` is the value of `other`, a bounded [`super::singleton::Singleton`] or
899 /// [`Optional`]. If `other` is an empty [`Optional`], no values will be produced.
900 ///
901 /// # Example
902 /// ```rust
903 /// # #[cfg(feature = "deploy")] {
904 /// # use hydro_lang::prelude::*;
905 /// # use futures::StreamExt;
906 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
907 /// let tick = process.tick();
908 /// let batch = process
909 /// .source_iter(q!(vec![(1, 123), (1, 456), (2, 123)]))
910 /// .into_keyed()
911 /// .batch(&tick, nondet!(/** test */));
912 /// let count = batch.clone().entries().count(); // `count()` returns a singleton
913 /// batch.cross_singleton(count).all_ticks().entries()
914 /// # }, |mut stream| async move {
915 /// // { 1: [(123, 3), (456, 3)], 2: [(123, 3)] }
916 /// # let mut results = Vec::new();
917 /// # for _ in 0..3 {
918 /// # results.push(stream.next().await.unwrap());
919 /// # }
920 /// # results.sort();
921 /// # assert_eq!(results, vec![(1, (123, 3)), (1, (456, 3)), (2, (123, 3))]);
922 /// # }));
923 /// # }
924 /// ```
925 pub fn cross_singleton<O2>(
926 self,
927 other: impl Into<Optional<O2, L, Bounded>>,
928 ) -> KeyedStream<K, (V, O2), L, B, O, R>
929 where
930 O2: Clone,
931 {
932 let other: Optional<O2, L, Bounded> = other.into();
933 check_matching_location(&self.location, &other.location);
934
935 Stream::new(
936 self.location.clone(),
937 HydroNode::CrossSingleton {
938 left: Box::new(self.ir_node.into_inner()),
939 right: Box::new(other.ir_node.into_inner()),
940 metadata: self
941 .location
942 .new_node_metadata(Stream::<((K, V), O2), L, B, O, R>::collection_kind()),
943 },
944 )
945 .map(q!(|((k, v), o2)| (k, (v, o2))))
946 .into_keyed()
947 }
948
949 /// For each value `v` in each group, transform `v` using `f` and then treat the
950 /// result as an [`Iterator`] to produce values one by one within the same group.
951 /// The implementation for [`Iterator`] for the output type `I` must produce items
952 /// in a **deterministic** order.
953 ///
954 /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
955 /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
956 ///
957 /// # Example
958 /// ```rust
959 /// # #[cfg(feature = "deploy")] {
960 /// # use hydro_lang::prelude::*;
961 /// # use futures::StreamExt;
962 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
963 /// process
964 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
965 /// .into_keyed()
966 /// .flat_map_ordered(q!(|x| x))
967 /// # .entries()
968 /// # }, |mut stream| async move {
969 /// // { 1: [2, 3, 4], 2: [5, 6] }
970 /// # let mut results = Vec::new();
971 /// # for _ in 0..5 {
972 /// # results.push(stream.next().await.unwrap());
973 /// # }
974 /// # results.sort();
975 /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
976 /// # }));
977 /// # }
978 /// ```
979 pub fn flat_map_ordered<U, I, F>(
980 self,
981 f: impl IntoQuotedMut<'a, F, L> + Copy,
982 ) -> KeyedStream<K, U, L, B, O, R>
983 where
984 I: IntoIterator<Item = U>,
985 F: Fn(V) -> I + 'a,
986 K: Clone,
987 {
988 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
989 let flat_map_f = q!({
990 let orig = f;
991 move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
992 })
993 .splice_fn1_ctx::<(K, V), _>(&self.location)
994 .into();
995
996 KeyedStream::new(
997 self.location.clone(),
998 HydroNode::FlatMap {
999 f: flat_map_f,
1000 input: Box::new(self.ir_node.into_inner()),
1001 metadata: self
1002 .location
1003 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
1004 },
1005 )
1006 }
1007
1008 /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
1009 /// for the output type `I` to produce items in any order.
1010 ///
1011 /// # Example
1012 /// ```rust
1013 /// # #[cfg(feature = "deploy")] {
1014 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1015 /// # use futures::StreamExt;
1016 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1017 /// process
1018 /// .source_iter(q!(vec![
1019 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1020 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
1021 /// ]))
1022 /// .into_keyed()
1023 /// .flat_map_unordered(q!(|x| x))
1024 /// # .entries()
1025 /// # }, |mut stream| async move {
1026 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1027 /// # let mut results = Vec::new();
1028 /// # for _ in 0..4 {
1029 /// # results.push(stream.next().await.unwrap());
1030 /// # }
1031 /// # results.sort();
1032 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1033 /// # }));
1034 /// # }
1035 /// ```
1036 pub fn flat_map_unordered<U, I, F>(
1037 self,
1038 f: impl IntoQuotedMut<'a, F, L> + Copy,
1039 ) -> KeyedStream<K, U, L, B, NoOrder, R>
1040 where
1041 I: IntoIterator<Item = U>,
1042 F: Fn(V) -> I + 'a,
1043 K: Clone,
1044 {
1045 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1046 let flat_map_f = q!({
1047 let orig = f;
1048 move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1049 })
1050 .splice_fn1_ctx::<(K, V), _>(&self.location)
1051 .into();
1052
1053 KeyedStream::new(
1054 self.location.clone(),
1055 HydroNode::FlatMap {
1056 f: flat_map_f,
1057 input: Box::new(self.ir_node.into_inner()),
1058 metadata: self
1059 .location
1060 .new_node_metadata(KeyedStream::<K, U, L, B, NoOrder, R>::collection_kind()),
1061 },
1062 )
1063 }
1064
1065 /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
1066 /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
1067 /// items in a **deterministic** order.
1068 ///
1069 /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
1070 /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
1071 ///
1072 /// # Example
1073 /// ```rust
1074 /// # #[cfg(feature = "deploy")] {
1075 /// # use hydro_lang::prelude::*;
1076 /// # use futures::StreamExt;
1077 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1078 /// process
1079 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
1080 /// .into_keyed()
1081 /// .flatten_ordered()
1082 /// # .entries()
1083 /// # }, |mut stream| async move {
1084 /// // { 1: [2, 3, 4], 2: [5, 6] }
1085 /// # let mut results = Vec::new();
1086 /// # for _ in 0..5 {
1087 /// # results.push(stream.next().await.unwrap());
1088 /// # }
1089 /// # results.sort();
1090 /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1091 /// # }));
1092 /// # }
1093 /// ```
1094 pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
1095 where
1096 V: IntoIterator<Item = U>,
1097 K: Clone,
1098 {
1099 self.flat_map_ordered(q!(|d| d))
1100 }
1101
1102 /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
1103 /// for the value type `V` to produce items in any order.
1104 ///
1105 /// # Example
1106 /// ```rust
1107 /// # #[cfg(feature = "deploy")] {
1108 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1109 /// # use futures::StreamExt;
1110 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1111 /// process
1112 /// .source_iter(q!(vec![
1113 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1114 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
1115 /// ]))
1116 /// .into_keyed()
1117 /// .flatten_unordered()
1118 /// # .entries()
1119 /// # }, |mut stream| async move {
1120 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1121 /// # let mut results = Vec::new();
1122 /// # for _ in 0..4 {
1123 /// # results.push(stream.next().await.unwrap());
1124 /// # }
1125 /// # results.sort();
1126 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1127 /// # }));
1128 /// # }
1129 /// ```
1130 pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
1131 where
1132 V: IntoIterator<Item = U>,
1133 K: Clone,
1134 {
1135 self.flat_map_unordered(q!(|d| d))
1136 }
1137
1138 /// An operator which allows you to "inspect" each element of a stream without
1139 /// modifying it. The closure `f` is called on a reference to each value. This is
1140 /// mainly useful for debugging, and should not be used to generate side-effects.
1141 ///
1142 /// # Example
1143 /// ```rust
1144 /// # #[cfg(feature = "deploy")] {
1145 /// # use hydro_lang::prelude::*;
1146 /// # use futures::StreamExt;
1147 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1148 /// process
1149 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1150 /// .into_keyed()
1151 /// .inspect(q!(|v| println!("{}", v)))
1152 /// # .entries()
1153 /// # }, |mut stream| async move {
1154 /// # let mut results = Vec::new();
1155 /// # for _ in 0..3 {
1156 /// # results.push(stream.next().await.unwrap());
1157 /// # }
1158 /// # results.sort();
1159 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1160 /// # }));
1161 /// # }
1162 /// ```
1163 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1164 where
1165 F: Fn(&V) + 'a,
1166 {
1167 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1168 let inspect_f = q!({
1169 let orig = f;
1170 move |t: &(_, _)| orig(&t.1)
1171 })
1172 .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1173 .into();
1174
1175 KeyedStream::new(
1176 self.location.clone(),
1177 HydroNode::Inspect {
1178 f: inspect_f,
1179 input: Box::new(self.ir_node.into_inner()),
1180 metadata: self.location.new_node_metadata(Self::collection_kind()),
1181 },
1182 )
1183 }
1184
1185 /// An operator which allows you to "inspect" each element of a stream without
1186 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1187 /// mainly useful for debugging, and should not be used to generate side-effects.
1188 ///
1189 /// # Example
1190 /// ```rust
1191 /// # #[cfg(feature = "deploy")] {
1192 /// # use hydro_lang::prelude::*;
1193 /// # use futures::StreamExt;
1194 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1195 /// process
1196 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1197 /// .into_keyed()
1198 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1199 /// # .entries()
1200 /// # }, |mut stream| async move {
1201 /// # let mut results = Vec::new();
1202 /// # for _ in 0..3 {
1203 /// # results.push(stream.next().await.unwrap());
1204 /// # }
1205 /// # results.sort();
1206 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1207 /// # }));
1208 /// # }
1209 /// ```
1210 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1211 where
1212 F: Fn(&(K, V)) + 'a,
1213 {
1214 let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1215
1216 KeyedStream::new(
1217 self.location.clone(),
1218 HydroNode::Inspect {
1219 f: inspect_f,
1220 input: Box::new(self.ir_node.into_inner()),
1221 metadata: self.location.new_node_metadata(Self::collection_kind()),
1222 },
1223 )
1224 }
1225
1226 /// An operator which allows you to "name" a `HydroNode`.
1227 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1228 pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
1229 {
1230 let mut node = self.ir_node.borrow_mut();
1231 let metadata = node.metadata_mut();
1232 metadata.tag = Some(name.to_owned());
1233 }
1234 self
1235 }
1236
1237 /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
1238 ///
1239 /// Unlike [`KeyedStream::fold`] which only returns the final accumulated value, `scan` produces a new stream
1240 /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
1241 /// early by returning `None`.
1242 ///
1243 /// The function takes a mutable reference to the accumulator and the current element, and returns
1244 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1245 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1246 ///
1247 /// # Example
1248 /// ```rust
1249 /// # #[cfg(feature = "deploy")] {
1250 /// # use hydro_lang::prelude::*;
1251 /// # use futures::StreamExt;
1252 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1253 /// process
1254 /// .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
1255 /// .into_keyed()
1256 /// .scan(
1257 /// q!(|| 0),
1258 /// q!(|acc, x| {
1259 /// *acc += x;
1260 /// if *acc % 2 == 0 { None } else { Some(*acc) }
1261 /// }),
1262 /// )
1263 /// # .entries()
1264 /// # }, |mut stream| async move {
1265 /// // Output: { 0: [1], 1: [3, 7] }
1266 /// # let mut results = Vec::new();
1267 /// # for _ in 0..3 {
1268 /// # results.push(stream.next().await.unwrap());
1269 /// # }
1270 /// # results.sort();
1271 /// # assert_eq!(results, vec![(0, 1), (1, 3), (1, 7)]);
1272 /// # }));
1273 /// # }
1274 /// ```
1275 pub fn scan<A, U, I, F>(
1276 self,
1277 init: impl IntoQuotedMut<'a, I, L> + Copy,
1278 f: impl IntoQuotedMut<'a, F, L> + Copy,
1279 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1280 where
1281 O: IsOrdered,
1282 R: IsExactlyOnce,
1283 K: Clone + Eq + Hash,
1284 I: Fn() -> A + 'a,
1285 F: Fn(&mut A, V) -> Option<U> + 'a,
1286 {
1287 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1288 self.make_totally_ordered().make_exactly_once().generator(
1289 init,
1290 q!({
1291 let orig = f;
1292 move |state, v| {
1293 if let Some(out) = orig(state, v) {
1294 Generate::Yield(out)
1295 } else {
1296 Generate::Break
1297 }
1298 }
1299 }),
1300 )
1301 }
1302
1303 /// Iteratively processes the elements in each group using a state machine that can yield
1304 /// elements as it processes its inputs. This is designed to mirror the unstable generator
1305 /// syntax in Rust, without requiring special syntax.
1306 ///
1307 /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
1308 /// state for each group. The second argument defines the processing logic, taking in a
1309 /// mutable reference to the group's state and the value to be processed. It emits a
1310 /// [`Generate`] value, whose variants define what is emitted and whether further inputs
1311 /// should be processed.
1312 ///
1313 /// # Example
1314 /// ```rust
1315 /// # #[cfg(feature = "deploy")] {
1316 /// # use hydro_lang::prelude::*;
1317 /// # use futures::StreamExt;
1318 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1319 /// process
1320 /// .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
1321 /// .into_keyed()
1322 /// .generator(
1323 /// q!(|| 0),
1324 /// q!(|acc, x| {
1325 /// *acc += x;
1326 /// if *acc > 100 {
1327 /// hydro_lang::live_collections::keyed_stream::Generate::Return(
1328 /// "done!".to_owned()
1329 /// )
1330 /// } else if *acc % 2 == 0 {
1331 /// hydro_lang::live_collections::keyed_stream::Generate::Yield(
1332 /// "even".to_owned()
1333 /// )
1334 /// } else {
1335 /// hydro_lang::live_collections::keyed_stream::Generate::Continue
1336 /// }
1337 /// }),
1338 /// )
1339 /// # .entries()
1340 /// # }, |mut stream| async move {
1341 /// // Output: { 0: ["even", "done!"], 1: ["even"] }
1342 /// # let mut results = Vec::new();
1343 /// # for _ in 0..3 {
1344 /// # results.push(stream.next().await.unwrap());
1345 /// # }
1346 /// # results.sort();
1347 /// # assert_eq!(results, vec![(0, "done!".to_owned()), (0, "even".to_owned()), (1, "even".to_owned())]);
1348 /// # }));
1349 /// # }
1350 /// ```
1351 pub fn generator<A, U, I, F>(
1352 self,
1353 init: impl IntoQuotedMut<'a, I, L> + Copy,
1354 f: impl IntoQuotedMut<'a, F, L> + Copy,
1355 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1356 where
1357 O: IsOrdered,
1358 R: IsExactlyOnce,
1359 K: Clone + Eq + Hash,
1360 I: Fn() -> A + 'a,
1361 F: Fn(&mut A, V) -> Generate<U> + 'a,
1362 {
1363 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1364 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1365
1366 let this = self.make_totally_ordered().make_exactly_once();
1367
1368 let scan_init = q!(|| HashMap::new())
1369 .splice_fn0_ctx::<HashMap<K, Option<A>>>(&this.location)
1370 .into();
1371 let scan_f = q!(move |acc: &mut HashMap<_, _>, (k, v)| {
1372 let existing_state = acc.entry(Clone::clone(&k)).or_insert_with(|| Some(init()));
1373 if let Some(existing_state_value) = existing_state {
1374 match f(existing_state_value, v) {
1375 Generate::Yield(out) => Some(Some((k, out))),
1376 Generate::Return(out) => {
1377 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1378 Some(Some((k, out)))
1379 }
1380 Generate::Break => {
1381 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1382 Some(None)
1383 }
1384 Generate::Continue => Some(None),
1385 }
1386 } else {
1387 Some(None)
1388 }
1389 })
1390 .splice_fn2_borrow_mut_ctx::<HashMap<K, Option<A>>, (K, V), _>(&this.location)
1391 .into();
1392
1393 let scan_node = HydroNode::Scan {
1394 init: scan_init,
1395 acc: scan_f,
1396 input: Box::new(this.ir_node.into_inner()),
1397 metadata: this.location.new_node_metadata(Stream::<
1398 Option<(K, U)>,
1399 L,
1400 B,
1401 TotalOrder,
1402 ExactlyOnce,
1403 >::collection_kind()),
1404 };
1405
1406 let flatten_f = q!(|d| d)
1407 .splice_fn1_ctx::<Option<(K, U)>, _>(&this.location)
1408 .into();
1409 let flatten_node = HydroNode::FlatMap {
1410 f: flatten_f,
1411 input: Box::new(scan_node),
1412 metadata: this.location.new_node_metadata(KeyedStream::<
1413 K,
1414 U,
1415 L,
1416 B,
1417 TotalOrder,
1418 ExactlyOnce,
1419 >::collection_kind()),
1420 };
1421
1422 KeyedStream::new(this.location, flatten_node)
1423 }
1424
1425 /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
1426 /// in-order across the values in each group. But the aggregation function returns a boolean,
1427 /// which when true indicates that the aggregated result is complete and can be released to
1428 /// downstream computation. Unlike [`KeyedStream::fold`], this means that even if the input
1429 /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
1430 /// normal stream elements.
1431 ///
1432 /// # Example
1433 /// ```rust
1434 /// # #[cfg(feature = "deploy")] {
1435 /// # use hydro_lang::prelude::*;
1436 /// # use futures::StreamExt;
1437 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1438 /// process
1439 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1440 /// .into_keyed()
1441 /// .fold_early_stop(
1442 /// q!(|| 0),
1443 /// q!(|acc, x| {
1444 /// *acc += x;
1445 /// x % 2 == 0
1446 /// }),
1447 /// )
1448 /// # .entries()
1449 /// # }, |mut stream| async move {
1450 /// // Output: { 0: 2, 1: 9 }
1451 /// # let mut results = Vec::new();
1452 /// # for _ in 0..2 {
1453 /// # results.push(stream.next().await.unwrap());
1454 /// # }
1455 /// # results.sort();
1456 /// # assert_eq!(results, vec![(0, 2), (1, 9)]);
1457 /// # }));
1458 /// # }
1459 /// ```
1460 pub fn fold_early_stop<A, I, F>(
1461 self,
1462 init: impl IntoQuotedMut<'a, I, L> + Copy,
1463 f: impl IntoQuotedMut<'a, F, L> + Copy,
1464 ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
1465 where
1466 O: IsOrdered,
1467 R: IsExactlyOnce,
1468 K: Clone + Eq + Hash,
1469 I: Fn() -> A + 'a,
1470 F: Fn(&mut A, V) -> bool + 'a,
1471 {
1472 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1473 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1474 let out_without_bound_cast = self.generator(
1475 q!(move || Some(init())),
1476 q!(move |key_state, v| {
1477 if let Some(key_state_value) = key_state.as_mut() {
1478 if f(key_state_value, v) {
1479 Generate::Return(key_state.take().unwrap())
1480 } else {
1481 Generate::Continue
1482 }
1483 } else {
1484 unreachable!()
1485 }
1486 }),
1487 );
1488
1489 KeyedSingleton::new(
1490 out_without_bound_cast.location.clone(),
1491 HydroNode::Cast {
1492 inner: Box::new(out_without_bound_cast.ir_node.into_inner()),
1493 metadata: out_without_bound_cast
1494 .location
1495 .new_node_metadata(
1496 KeyedSingleton::<K, A, L, B::WhenValueBounded>::collection_kind(),
1497 ),
1498 },
1499 )
1500 }
1501
1502 /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
1503 /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
1504 /// otherwise the first element would be non-deterministic.
1505 ///
1506 /// # Example
1507 /// ```rust
1508 /// # #[cfg(feature = "deploy")] {
1509 /// # use hydro_lang::prelude::*;
1510 /// # use futures::StreamExt;
1511 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1512 /// process
1513 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1514 /// .into_keyed()
1515 /// .first()
1516 /// # .entries()
1517 /// # }, |mut stream| async move {
1518 /// // Output: { 0: 2, 1: 3 }
1519 /// # let mut results = Vec::new();
1520 /// # for _ in 0..2 {
1521 /// # results.push(stream.next().await.unwrap());
1522 /// # }
1523 /// # results.sort();
1524 /// # assert_eq!(results, vec![(0, 2), (1, 3)]);
1525 /// # }));
1526 /// # }
1527 /// ```
1528 pub fn first(self) -> KeyedSingleton<K, V, L, B::WhenValueBounded>
1529 where
1530 O: IsOrdered,
1531 R: IsExactlyOnce,
1532 K: Clone + Eq + Hash,
1533 {
1534 self.fold_early_stop(
1535 q!(|| None),
1536 q!(|acc, v| {
1537 *acc = Some(v);
1538 true
1539 }),
1540 )
1541 .map(q!(|v| v.unwrap()))
1542 }
1543
1544 /// Assigns a zero-based index to each value within each key group, emitting
1545 /// `(K, (index, V))` tuples with per-key sequential indices.
1546 ///
1547 /// The output keyed stream has [`TotalOrder`] and [`ExactlyOnce`] guarantees.
1548 /// This is a streaming operator that processes elements as they arrive.
1549 ///
1550 /// # Example
1551 /// ```rust
1552 /// # #[cfg(feature = "deploy")] {
1553 /// # use hydro_lang::prelude::*;
1554 /// # use futures::StreamExt;
1555 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1556 /// process
1557 /// .source_iter(q!(vec![(1, 10), (2, 20), (1, 30)]))
1558 /// .into_keyed()
1559 /// .enumerate()
1560 /// # .entries()
1561 /// # }, |mut stream| async move {
1562 /// // per-key indices: { 1: [(0, 10), (1, 30)], 2: [(0, 20)] }
1563 /// # let mut results = Vec::new();
1564 /// # for _ in 0..3 {
1565 /// # results.push(stream.next().await.unwrap());
1566 /// # }
1567 /// # let key1: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
1568 /// # let key2: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
1569 /// # assert_eq!(key1, vec![(0, 10), (1, 30)]);
1570 /// # assert_eq!(key2, vec![(0, 20)]);
1571 /// # }));
1572 /// # }
1573 /// ```
1574 pub fn enumerate(self) -> KeyedStream<K, (usize, V), L, B, TotalOrder, ExactlyOnce>
1575 where
1576 O: IsOrdered,
1577 R: IsExactlyOnce,
1578 K: Eq + Hash + Clone,
1579 {
1580 self.scan(
1581 q!(|| 0),
1582 q!(|acc, next| {
1583 let curr = *acc;
1584 *acc += 1;
1585 Some((curr, next))
1586 }),
1587 )
1588 }
1589
1590 /// Counts the number of elements in each group, producing a [`KeyedSingleton`] with the counts.
1591 ///
1592 /// # Example
1593 /// ```rust
1594 /// # #[cfg(feature = "deploy")] {
1595 /// # use hydro_lang::prelude::*;
1596 /// # use futures::StreamExt;
1597 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1598 /// let tick = process.tick();
1599 /// let numbers = process
1600 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4), (1, 5)]))
1601 /// .into_keyed();
1602 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1603 /// batch
1604 /// .value_counts()
1605 /// .entries()
1606 /// .all_ticks()
1607 /// # }, |mut stream| async move {
1608 /// // (1, 3), (2, 2)
1609 /// # let mut results = Vec::new();
1610 /// # for _ in 0..2 {
1611 /// # results.push(stream.next().await.unwrap());
1612 /// # }
1613 /// # results.sort();
1614 /// # assert_eq!(results, vec![(1, 3), (2, 2)]);
1615 /// # }));
1616 /// # }
1617 /// ```
1618 pub fn value_counts(self) -> KeyedSingleton<K, usize, L, B::WhenValueUnbounded>
1619 where
1620 R: IsExactlyOnce,
1621 K: Eq + Hash,
1622 {
1623 self.make_exactly_once()
1624 .assume_ordering_trusted(
1625 nondet!(/** ordering within each group affects neither result nor intermediates */),
1626 )
1627 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1628 }
1629
1630 /// Like [`Stream::fold`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1631 /// group via the `comb` closure.
1632 ///
1633 /// Depending on the input stream guarantees, the closure may need to be commutative
1634 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1635 ///
1636 /// If the input and output value types are the same and do not require initialization then use
1637 /// [`KeyedStream::reduce`].
1638 ///
1639 /// # Example
1640 /// ```rust
1641 /// # #[cfg(feature = "deploy")] {
1642 /// # use hydro_lang::prelude::*;
1643 /// # use futures::StreamExt;
1644 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1645 /// let tick = process.tick();
1646 /// let numbers = process
1647 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1648 /// .into_keyed();
1649 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1650 /// batch
1651 /// .fold(q!(|| false), q!(|acc, x| *acc |= x))
1652 /// .entries()
1653 /// .all_ticks()
1654 /// # }, |mut stream| async move {
1655 /// // (1, false), (2, true)
1656 /// # let mut results = Vec::new();
1657 /// # for _ in 0..2 {
1658 /// # results.push(stream.next().await.unwrap());
1659 /// # }
1660 /// # results.sort();
1661 /// # assert_eq!(results, vec![(1, false), (2, true)]);
1662 /// # }));
1663 /// # }
1664 /// ```
1665 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V), C, Idemp>(
1666 self,
1667 init: impl IntoQuotedMut<'a, I, L>,
1668 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1669 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
1670 where
1671 K: Eq + Hash,
1672 C: ValidCommutativityFor<O>,
1673 Idemp: ValidIdempotenceFor<R>,
1674 {
1675 let init = init.splice_fn0_ctx(&self.location).into();
1676 let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1677 proof.register_proof(&comb);
1678
1679 let ordered = self
1680 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1681 .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1682
1683 KeyedSingleton::new(
1684 ordered.location.clone(),
1685 HydroNode::FoldKeyed {
1686 init,
1687 acc: comb.into(),
1688 input: Box::new(ordered.ir_node.into_inner()),
1689 metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1690 K,
1691 A,
1692 L,
1693 B::WhenValueUnbounded,
1694 >::collection_kind()),
1695 },
1696 )
1697 }
1698
1699 /// Like [`Stream::reduce`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1700 /// group via the `comb` closure.
1701 ///
1702 /// Depending on the input stream guarantees, the closure may need to be commutative
1703 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1704 ///
1705 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1706 ///
1707 /// # Example
1708 /// ```rust
1709 /// # #[cfg(feature = "deploy")] {
1710 /// # use hydro_lang::prelude::*;
1711 /// # use futures::StreamExt;
1712 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1713 /// let tick = process.tick();
1714 /// let numbers = process
1715 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1716 /// .into_keyed();
1717 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1718 /// batch
1719 /// .reduce(q!(|acc, x| *acc |= x))
1720 /// .entries()
1721 /// .all_ticks()
1722 /// # }, |mut stream| async move {
1723 /// // (1, false), (2, true)
1724 /// # let mut results = Vec::new();
1725 /// # for _ in 0..2 {
1726 /// # results.push(stream.next().await.unwrap());
1727 /// # }
1728 /// # results.sort();
1729 /// # assert_eq!(results, vec![(1, false), (2, true)]);
1730 /// # }));
1731 /// # }
1732 /// ```
1733 pub fn reduce<F: Fn(&mut V, V) + 'a, C, Idemp>(
1734 self,
1735 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1736 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1737 where
1738 K: Eq + Hash,
1739 C: ValidCommutativityFor<O>,
1740 Idemp: ValidIdempotenceFor<R>,
1741 {
1742 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1743 proof.register_proof(&f);
1744
1745 let ordered = self
1746 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1747 .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1748
1749 KeyedSingleton::new(
1750 ordered.location.clone(),
1751 HydroNode::ReduceKeyed {
1752 f: f.into(),
1753 input: Box::new(ordered.ir_node.into_inner()),
1754 metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1755 K,
1756 V,
1757 L,
1758 B::WhenValueUnbounded,
1759 >::collection_kind()),
1760 },
1761 )
1762 }
1763
1764 /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark
1765 /// are automatically deleted.
1766 ///
1767 /// Depending on the input stream guarantees, the closure may need to be commutative
1768 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1769 ///
1770 /// # Example
1771 /// ```rust
1772 /// # #[cfg(feature = "deploy")] {
1773 /// # use hydro_lang::prelude::*;
1774 /// # use futures::StreamExt;
1775 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1776 /// let tick = process.tick();
1777 /// let watermark = tick.singleton(q!(1));
1778 /// let numbers = process
1779 /// .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1780 /// .into_keyed();
1781 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1782 /// batch
1783 /// .reduce_watermark(watermark, q!(|acc, x| *acc |= x))
1784 /// .entries()
1785 /// .all_ticks()
1786 /// # }, |mut stream| async move {
1787 /// // (2, true)
1788 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1789 /// # }));
1790 /// # }
1791 /// ```
1792 pub fn reduce_watermark<O2, F, C, Idemp>(
1793 self,
1794 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1795 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1796 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1797 where
1798 K: Eq + Hash,
1799 O2: Clone,
1800 F: Fn(&mut V, V) + 'a,
1801 C: ValidCommutativityFor<O>,
1802 Idemp: ValidIdempotenceFor<R>,
1803 {
1804 let other: Optional<O2, Tick<L::Root>, Bounded> = other.into();
1805 check_matching_location(&self.location.root(), other.location.outer());
1806 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1807 proof.register_proof(&f);
1808
1809 let ordered = self
1810 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1811 .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1812
1813 KeyedSingleton::new(
1814 ordered.location.clone(),
1815 HydroNode::ReduceKeyedWatermark {
1816 f: f.into(),
1817 input: Box::new(ordered.ir_node.into_inner()),
1818 watermark: Box::new(other.ir_node.into_inner()),
1819 metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1820 K,
1821 V,
1822 L,
1823 B::WhenValueUnbounded,
1824 >::collection_kind()),
1825 },
1826 )
1827 }
1828
1829 /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1830 /// whose keys are not in the bounded stream.
1831 ///
1832 /// # Example
1833 /// ```rust
1834 /// # #[cfg(feature = "deploy")] {
1835 /// # use hydro_lang::prelude::*;
1836 /// # use futures::StreamExt;
1837 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1838 /// let tick = process.tick();
1839 /// let keyed_stream = process
1840 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1841 /// .batch(&tick, nondet!(/** test */))
1842 /// .into_keyed();
1843 /// let keys_to_remove = process
1844 /// .source_iter(q!(vec![1, 2]))
1845 /// .batch(&tick, nondet!(/** test */));
1846 /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1847 /// # .entries()
1848 /// # }, |mut stream| async move {
1849 /// // { 3: ['c'], 4: ['d'] }
1850 /// # let mut results = Vec::new();
1851 /// # for _ in 0..2 {
1852 /// # results.push(stream.next().await.unwrap());
1853 /// # }
1854 /// # results.sort();
1855 /// # assert_eq!(results, vec![(3, 'c'), (4, 'd')]);
1856 /// # }));
1857 /// # }
1858 /// ```
1859 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1860 self,
1861 other: Stream<K, L, Bounded, O2, R2>,
1862 ) -> Self
1863 where
1864 K: Eq + Hash,
1865 {
1866 check_matching_location(&self.location, &other.location);
1867
1868 KeyedStream::new(
1869 self.location.clone(),
1870 HydroNode::AntiJoin {
1871 pos: Box::new(self.ir_node.into_inner()),
1872 neg: Box::new(other.ir_node.into_inner()),
1873 metadata: self.location.new_node_metadata(Self::collection_kind()),
1874 },
1875 )
1876 }
1877
1878 /// Emit a keyed stream containing keys shared between two keyed streams,
1879 /// where each value in the output keyed stream is a tuple of
1880 /// (self's value, other's value).
1881 /// If there are multiple values for the same key, this performs a cross product
1882 /// for each matching key.
1883 ///
1884 /// # Example
1885 /// ```rust
1886 /// # #[cfg(feature = "deploy")] {
1887 /// # use hydro_lang::prelude::*;
1888 /// # use futures::StreamExt;
1889 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1890 /// let tick = process.tick();
1891 /// let keyed_data = process
1892 /// .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
1893 /// .into_keyed()
1894 /// .batch(&tick, nondet!(/** test */));
1895 /// let other_data = process
1896 /// .source_iter(q!(vec![(1, 100), (2, 200), (2, 201)]))
1897 /// .into_keyed()
1898 /// .batch(&tick, nondet!(/** test */));
1899 /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
1900 /// # }, |mut stream| async move {
1901 /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200), (20, 201)] } in any order
1902 /// # let mut results = vec![];
1903 /// # for _ in 0..4 {
1904 /// # results.push(stream.next().await.unwrap());
1905 /// # }
1906 /// # results.sort();
1907 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200)), (2, (20, 201))]);
1908 /// # }));
1909 /// # }
1910 /// ```
1911 pub fn join_keyed_stream<V2, O2: Ordering, R2: Retries>(
1912 self,
1913 other: KeyedStream<K, V2, L, B, O2, R2>,
1914 ) -> KeyedStream<K, (V, V2), L, B, NoOrder, <R as MinRetries<R2>>::Min>
1915 where
1916 K: Eq + Hash,
1917 R: MinRetries<R2>,
1918 {
1919 self.entries().join(other.entries()).into_keyed()
1920 }
1921
1922 /// Deduplicates values within each key group, emitting each unique value per key
1923 /// exactly once.
1924 ///
1925 /// # Example
1926 /// ```rust
1927 /// # #[cfg(feature = "deploy")] {
1928 /// # use hydro_lang::prelude::*;
1929 /// # use futures::StreamExt;
1930 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1931 /// process
1932 /// .source_iter(q!(vec![(1, 10), (2, 20), (1, 10), (2, 30), (1, 20)]))
1933 /// .into_keyed()
1934 /// .unique()
1935 /// # .entries()
1936 /// # }, |mut stream| async move {
1937 /// // unique values per key: { 1: [10, 20], 2: [20, 30] }
1938 /// # let mut results = Vec::new();
1939 /// # for _ in 0..4 {
1940 /// # results.push(stream.next().await.unwrap());
1941 /// # }
1942 /// # let mut key1: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
1943 /// # let mut key2: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
1944 /// # key1.sort();
1945 /// # key2.sort();
1946 /// # assert_eq!(key1, vec![10, 20]);
1947 /// # assert_eq!(key2, vec![20, 30]);
1948 /// # }));
1949 /// # }
1950 /// ```
1951 pub fn unique(self) -> KeyedStream<K, V, L, B, NoOrder, ExactlyOnce>
1952 where
1953 K: Eq + Hash + Clone,
1954 V: Eq + Hash + Clone,
1955 {
1956 self.entries().unique().into_keyed()
1957 }
1958
1959 /// Sorts the values within each key group in ascending order.
1960 ///
1961 /// The output keyed stream has a [`TotalOrder`] guarantee on the values within
1962 /// each group. This operator will block until all elements in the input stream
1963 /// are available, so it requires the input stream to be [`Bounded`].
1964 ///
1965 /// # Example
1966 /// ```rust
1967 /// # #[cfg(feature = "deploy")] {
1968 /// # use hydro_lang::prelude::*;
1969 /// # use futures::StreamExt;
1970 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1971 /// let tick = process.tick();
1972 /// let numbers = process
1973 /// .source_iter(q!(vec![(1, 3), (2, 1), (1, 1), (2, 2)]))
1974 /// .into_keyed();
1975 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1976 /// batch.sort().all_ticks()
1977 /// # .entries()
1978 /// # }, |mut stream| async move {
1979 /// // values sorted within each key: { 1: [1, 3], 2: [1, 2] }
1980 /// # let mut results = Vec::new();
1981 /// # for _ in 0..4 {
1982 /// # results.push(stream.next().await.unwrap());
1983 /// # }
1984 /// # let key1_vals: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
1985 /// # let key2_vals: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
1986 /// # assert_eq!(key1_vals, vec![1, 3]);
1987 /// # assert_eq!(key2_vals, vec![1, 2]);
1988 /// # }));
1989 /// # }
1990 /// ```
1991 pub fn sort(self) -> KeyedStream<K, V, L, Bounded, TotalOrder, R>
1992 where
1993 B: IsBounded,
1994 K: Ord,
1995 V: Ord,
1996 {
1997 self.entries().sort().into_keyed()
1998 }
1999
2000 /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
2001 /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
2002 /// is only present in one of the inputs, its values are passed through as-is). The output has
2003 /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
2004 ///
2005 /// Currently, both input streams must be [`Bounded`]. This operator will block
2006 /// on the first stream until all its elements are available. In a future version,
2007 /// we will relax the requirement on the `other` stream.
2008 ///
2009 /// # Example
2010 /// ```rust
2011 /// # #[cfg(feature = "deploy")] {
2012 /// # use hydro_lang::prelude::*;
2013 /// # use futures::StreamExt;
2014 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2015 /// let tick = process.tick();
2016 /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
2017 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2018 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2019 /// # .entries()
2020 /// # }, |mut stream| async move {
2021 /// // { 0: [2, 1], 1: [4, 3] }
2022 /// # let mut results = Vec::new();
2023 /// # for _ in 0..4 {
2024 /// # results.push(stream.next().await.unwrap());
2025 /// # }
2026 /// # results.sort();
2027 /// # assert_eq!(results, vec![(0, 1), (0, 2), (1, 3), (1, 4)]);
2028 /// # }));
2029 /// # }
2030 /// ```
2031 pub fn chain<O2: Ordering, R2: Retries>(
2032 self,
2033 other: KeyedStream<K, V, L, Bounded, O2, R2>,
2034 ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2035 where
2036 B: IsBounded,
2037 O: MinOrder<O2>,
2038 R: MinRetries<R2>,
2039 {
2040 let this = self.make_bounded();
2041 check_matching_location(&this.location, &other.location);
2042
2043 KeyedStream::new(
2044 this.location.clone(),
2045 HydroNode::Chain {
2046 first: Box::new(this.ir_node.into_inner()),
2047 second: Box::new(other.ir_node.into_inner()),
2048 metadata: this.location.new_node_metadata(KeyedStream::<
2049 K,
2050 V,
2051 L,
2052 Bounded,
2053 <O as MinOrder<O2>>::Min,
2054 <R as MinRetries<R2>>::Min,
2055 >::collection_kind()),
2056 },
2057 )
2058 }
2059
2060 /// Emit a keyed stream containing keys shared between the keyed stream and the
2061 /// keyed singleton, where each value in the output keyed stream is a tuple of
2062 /// (the keyed stream's value, the keyed singleton's value).
2063 ///
2064 /// # Example
2065 /// ```rust
2066 /// # #[cfg(feature = "deploy")] {
2067 /// # use hydro_lang::prelude::*;
2068 /// # use futures::StreamExt;
2069 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2070 /// let tick = process.tick();
2071 /// let keyed_data = process
2072 /// .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2073 /// .into_keyed()
2074 /// .batch(&tick, nondet!(/** test */));
2075 /// let singleton_data = process
2076 /// .source_iter(q!(vec![(1, 100), (2, 200)]))
2077 /// .into_keyed()
2078 /// .batch(&tick, nondet!(/** test */))
2079 /// .first();
2080 /// keyed_data.join_keyed_singleton(singleton_data).entries().all_ticks()
2081 /// # }, |mut stream| async move {
2082 /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200)] } in any order
2083 /// # let mut results = vec![];
2084 /// # for _ in 0..3 {
2085 /// # results.push(stream.next().await.unwrap());
2086 /// # }
2087 /// # results.sort();
2088 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200))]);
2089 /// # }));
2090 /// # }
2091 /// ```
2092 pub fn join_keyed_singleton<V2: Clone>(
2093 self,
2094 keyed_singleton: KeyedSingleton<K, V2, L, Bounded>,
2095 ) -> KeyedStream<K, (V, V2), L, Bounded, NoOrder, R>
2096 where
2097 B: IsBounded,
2098 K: Eq + Hash,
2099 {
2100 keyed_singleton
2101 .join_keyed_stream(self.make_bounded())
2102 .map(q!(|(v2, v)| (v, v2)))
2103 }
2104
2105 /// Gets the values associated with a specific key from the keyed stream.
2106 ///
2107 /// # Example
2108 /// ```rust
2109 /// # #[cfg(feature = "deploy")] {
2110 /// # use hydro_lang::prelude::*;
2111 /// # use futures::StreamExt;
2112 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2113 /// let tick = process.tick();
2114 /// let keyed_data = process
2115 /// .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2116 /// .into_keyed()
2117 /// .batch(&tick, nondet!(/** test */));
2118 /// let key = tick.singleton(q!(1));
2119 /// keyed_data.get(key).all_ticks()
2120 /// # }, |mut stream| async move {
2121 /// // 10, 11 in any order
2122 /// # let mut results = vec![];
2123 /// # for _ in 0..2 {
2124 /// # results.push(stream.next().await.unwrap());
2125 /// # }
2126 /// # results.sort();
2127 /// # assert_eq!(results, vec![10, 11]);
2128 /// # }));
2129 /// # }
2130 /// ```
2131 pub fn get(self, key: Singleton<K, L, Bounded>) -> Stream<V, L, Bounded, NoOrder, R>
2132 where
2133 B: IsBounded,
2134 K: Eq + Hash,
2135 {
2136 self.make_bounded()
2137 .entries()
2138 .join(key.into_stream().map(q!(|k| (k, ()))))
2139 .map(q!(|(_, (v, _))| v))
2140 }
2141
2142 /// For each value in `self`, find the matching key in `lookup`.
2143 /// The output is a keyed stream with the key from `self`, and a value
2144 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2145 /// If the key is not present in `lookup`, the option will be [`None`].
2146 ///
2147 /// # Example
2148 /// ```rust
2149 /// # #[cfg(feature = "deploy")] {
2150 /// # use hydro_lang::prelude::*;
2151 /// # use futures::StreamExt;
2152 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2153 /// # let tick = process.tick();
2154 /// let requests = // { 1: [10, 11], 2: 20 }
2155 /// # process
2156 /// # .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2157 /// # .into_keyed()
2158 /// # .batch(&tick, nondet!(/** test */));
2159 /// let other_data = // { 10: 100, 11: 110 }
2160 /// # process
2161 /// # .source_iter(q!(vec![(10, 100), (11, 110)]))
2162 /// # .into_keyed()
2163 /// # .batch(&tick, nondet!(/** test */))
2164 /// # .first();
2165 /// requests.lookup_keyed_singleton(other_data)
2166 /// # .entries().all_ticks()
2167 /// # }, |mut stream| async move {
2168 /// // { 1: [(10, Some(100)), (11, Some(110))], 2: (20, None) }
2169 /// # let mut results = vec![];
2170 /// # for _ in 0..3 {
2171 /// # results.push(stream.next().await.unwrap());
2172 /// # }
2173 /// # results.sort();
2174 /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (11, Some(110))), (2, (20, None))]);
2175 /// # }));
2176 /// # }
2177 /// ```
2178 pub fn lookup_keyed_singleton<V2>(
2179 self,
2180 lookup: KeyedSingleton<V, V2, L, Bounded>,
2181 ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
2182 where
2183 B: IsBounded,
2184 K: Eq + Hash + Clone,
2185 V: Eq + Hash + Clone,
2186 V2: Clone,
2187 {
2188 self.lookup_keyed_stream(
2189 lookup
2190 .into_keyed_stream()
2191 .assume_retries::<R>(nondet!(/** Retries are irrelevant for keyed singletons */)),
2192 )
2193 }
2194
2195 /// For each value in `self`, find the matching key in `lookup`.
2196 /// The output is a keyed stream with the key from `self`, and a value
2197 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2198 /// If the key is not present in `lookup`, the option will be [`None`].
2199 ///
2200 /// # Example
2201 /// ```rust
2202 /// # #[cfg(feature = "deploy")] {
2203 /// # use hydro_lang::prelude::*;
2204 /// # use futures::StreamExt;
2205 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2206 /// # let tick = process.tick();
2207 /// let requests = // { 1: [10, 11], 2: 20 }
2208 /// # process
2209 /// # .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2210 /// # .into_keyed()
2211 /// # .batch(&tick, nondet!(/** test */));
2212 /// let other_data = // { 10: [100, 101], 11: 110 }
2213 /// # process
2214 /// # .source_iter(q!(vec![(10, 100), (10, 101), (11, 110)]))
2215 /// # .into_keyed()
2216 /// # .batch(&tick, nondet!(/** test */));
2217 /// requests.lookup_keyed_stream(other_data)
2218 /// # .entries().all_ticks()
2219 /// # }, |mut stream| async move {
2220 /// // { 1: [(10, Some(100)), (10, Some(101)), (11, Some(110))], 2: (20, None) }
2221 /// # let mut results = vec![];
2222 /// # for _ in 0..4 {
2223 /// # results.push(stream.next().await.unwrap());
2224 /// # }
2225 /// # results.sort();
2226 /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(101))), (1, (11, Some(110))), (2, (20, None))]);
2227 /// # }));
2228 /// # }
2229 /// ```
2230 #[expect(clippy::type_complexity, reason = "retries propagation")]
2231 pub fn lookup_keyed_stream<V2, O2: Ordering, R2: Retries>(
2232 self,
2233 lookup: KeyedStream<V, V2, L, Bounded, O2, R2>,
2234 ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, <R as MinRetries<R2>>::Min>
2235 where
2236 B: IsBounded,
2237 K: Eq + Hash + Clone,
2238 V: Eq + Hash + Clone,
2239 V2: Clone,
2240 R: MinRetries<R2>,
2241 {
2242 let inverted = self
2243 .make_bounded()
2244 .entries()
2245 .map(q!(|(key, lookup_value)| (lookup_value, key)))
2246 .into_keyed();
2247 let found = inverted
2248 .clone()
2249 .join_keyed_stream(lookup.clone())
2250 .entries()
2251 .map(q!(|(lookup_value, (key, value))| (
2252 key,
2253 (lookup_value, Some(value))
2254 )))
2255 .into_keyed();
2256 let not_found = inverted
2257 .filter_key_not_in(lookup.keys())
2258 .entries()
2259 .map(q!(|(lookup_value, key)| (key, (lookup_value, None))))
2260 .into_keyed();
2261
2262 found.chain(not_found.weaken_retries::<<R as MinRetries<R2>>::Min>())
2263 }
2264
2265 /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
2266 /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
2267 ///
2268 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2269 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2270 /// argument that declares where the stream will be atomically processed. Batching a stream into
2271 /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2272 /// [`Tick`] will introduce asynchrony.
2273 pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
2274 let out_location = Atomic { tick: tick.clone() };
2275 KeyedStream::new(
2276 out_location.clone(),
2277 HydroNode::BeginAtomic {
2278 inner: Box::new(self.ir_node.into_inner()),
2279 metadata: out_location
2280 .new_node_metadata(KeyedStream::<K, V, Atomic<L>, B, O, R>::collection_kind()),
2281 },
2282 )
2283 }
2284
2285 /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
2286 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2287 /// the order of the input.
2288 ///
2289 /// # Non-Determinism
2290 /// The batch boundaries are non-deterministic and may change across executions.
2291 pub fn batch(
2292 self,
2293 tick: &Tick<L>,
2294 nondet: NonDet,
2295 ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2296 let _ = nondet;
2297 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2298 KeyedStream::new(
2299 tick.clone(),
2300 HydroNode::Batch {
2301 inner: Box::new(self.ir_node.into_inner()),
2302 metadata: tick.new_node_metadata(
2303 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
2304 ),
2305 },
2306 )
2307 }
2308}
2309
2310impl<'a, K1, K2, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2311 KeyedStream<(K1, K2), V, L, B, O, R>
2312{
2313 /// Produces a new keyed stream by dropping the first element of the compound key.
2314 ///
2315 /// Because multiple keys may share the same suffix, this operation results in re-grouping
2316 /// of the values under the new keys. The values across groups with the same new key
2317 /// will be interleaved, so the resulting stream has [`NoOrder`] within each group.
2318 ///
2319 /// # Example
2320 /// ```rust
2321 /// # #[cfg(feature = "deploy")] {
2322 /// # use hydro_lang::prelude::*;
2323 /// # use futures::StreamExt;
2324 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2325 /// process
2326 /// .source_iter(q!(vec![((1, 10), 2), ((1, 10), 3), ((2, 20), 4)]))
2327 /// .into_keyed()
2328 /// .drop_key_prefix()
2329 /// # .entries()
2330 /// # }, |mut stream| async move {
2331 /// // { 10: [2, 3], 20: [4] }
2332 /// # let mut results = Vec::new();
2333 /// # for _ in 0..3 {
2334 /// # results.push(stream.next().await.unwrap());
2335 /// # }
2336 /// # results.sort();
2337 /// # assert_eq!(results, vec![(10, 2), (10, 3), (20, 4)]);
2338 /// # }));
2339 /// # }
2340 /// ```
2341 pub fn drop_key_prefix(self) -> KeyedStream<K2, V, L, B, NoOrder, R> {
2342 self.entries()
2343 .map(q!(|((_k1, k2), v)| (k2, v)))
2344 .into_keyed()
2345 }
2346}
2347
2348impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries>
2349 KeyedStream<K, V, L, Unbounded, O, R>
2350{
2351 /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
2352 /// of any overlapping groups. The result has [`NoOrder`] on each group because the
2353 /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
2354 /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
2355 ///
2356 /// Currently, both input streams must be [`Unbounded`].
2357 ///
2358 /// # Example
2359 /// ```rust
2360 /// # #[cfg(feature = "deploy")] {
2361 /// # use hydro_lang::prelude::*;
2362 /// # use futures::StreamExt;
2363 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2364 /// let numbers1: KeyedStream<i32, i32, _> = // { 1: [2], 3: [4] }
2365 /// # process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed().into();
2366 /// let numbers2: KeyedStream<i32, i32, _> = // { 1: [3], 3: [5] }
2367 /// # process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed().into();
2368 /// numbers1.interleave(numbers2)
2369 /// # .entries()
2370 /// # }, |mut stream| async move {
2371 /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
2372 /// # let mut results = Vec::new();
2373 /// # for _ in 0..4 {
2374 /// # results.push(stream.next().await.unwrap());
2375 /// # }
2376 /// # results.sort();
2377 /// # assert_eq!(results, vec![(1, 2), (1, 3), (3, 4), (3, 5)]);
2378 /// # }));
2379 /// # }
2380 /// ```
2381 pub fn interleave<O2: Ordering, R2: Retries>(
2382 self,
2383 other: KeyedStream<K, V, L, Unbounded, O2, R2>,
2384 ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2385 where
2386 R: MinRetries<R2>,
2387 {
2388 KeyedStream::new(
2389 self.location.clone(),
2390 HydroNode::Chain {
2391 first: Box::new(self.ir_node.into_inner()),
2392 second: Box::new(other.ir_node.into_inner()),
2393 metadata: self.location.new_node_metadata(KeyedStream::<
2394 K,
2395 V,
2396 L,
2397 Unbounded,
2398 NoOrder,
2399 <R as MinRetries<R2>>::Min,
2400 >::collection_kind()),
2401 },
2402 )
2403 }
2404}
2405
2406impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
2407where
2408 L: Location<'a> + NoTick,
2409{
2410 /// Returns a keyed stream corresponding to the latest batch of elements being atomically
2411 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2412 /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
2413 /// used to create the atomic section.
2414 ///
2415 /// # Non-Determinism
2416 /// The batch boundaries are non-deterministic and may change across executions.
2417 pub fn batch_atomic(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2418 let _ = nondet;
2419 KeyedStream::new(
2420 self.location.clone().tick,
2421 HydroNode::Batch {
2422 inner: Box::new(self.ir_node.into_inner()),
2423 metadata: self.location.tick.new_node_metadata(KeyedStream::<
2424 K,
2425 V,
2426 Tick<L>,
2427 Bounded,
2428 O,
2429 R,
2430 >::collection_kind(
2431 )),
2432 },
2433 )
2434 }
2435
2436 /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
2437 /// See [`KeyedStream::atomic`] for more details.
2438 pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
2439 KeyedStream::new(
2440 self.location.tick.l.clone(),
2441 HydroNode::EndAtomic {
2442 inner: Box::new(self.ir_node.into_inner()),
2443 metadata: self
2444 .location
2445 .tick
2446 .l
2447 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2448 },
2449 )
2450 }
2451}
2452
2453impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
2454where
2455 L: Location<'a>,
2456{
2457 /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2458 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2459 /// each key.
2460 pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
2461 KeyedStream::new(
2462 self.location.outer().clone(),
2463 HydroNode::YieldConcat {
2464 inner: Box::new(self.ir_node.into_inner()),
2465 metadata: self.location.outer().new_node_metadata(KeyedStream::<
2466 K,
2467 V,
2468 L,
2469 Unbounded,
2470 O,
2471 R,
2472 >::collection_kind(
2473 )),
2474 },
2475 )
2476 }
2477
2478 /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2479 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2480 /// each key.
2481 ///
2482 /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
2483 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2484 /// stream's [`Tick`] context.
2485 pub fn all_ticks_atomic(self) -> KeyedStream<K, V, Atomic<L>, Unbounded, O, R> {
2486 let out_location = Atomic {
2487 tick: self.location.clone(),
2488 };
2489
2490 KeyedStream::new(
2491 out_location.clone(),
2492 HydroNode::YieldConcat {
2493 inner: Box::new(self.ir_node.into_inner()),
2494 metadata: out_location.new_node_metadata(KeyedStream::<
2495 K,
2496 V,
2497 Atomic<L>,
2498 Unbounded,
2499 O,
2500 R,
2501 >::collection_kind()),
2502 },
2503 )
2504 }
2505
2506 /// Transforms the keyed stream using the given closure in "stateful" mode, where stateful operators
2507 /// such as `fold` retrain their memory for each key across ticks rather than resetting across batches of each key.
2508 ///
2509 /// This API is particularly useful for stateful computation on batches of data, such as
2510 /// maintaining an accumulated state that is up to date with the current batch.
2511 ///
2512 /// # Example
2513 /// ```rust
2514 /// # #[cfg(feature = "deploy")] {
2515 /// # use hydro_lang::prelude::*;
2516 /// # use futures::StreamExt;
2517 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2518 /// let tick = process.tick();
2519 /// # // ticks are lazy by default, forces the second tick to run
2520 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2521 /// # let batch_first_tick = process
2522 /// # .source_iter(q!(vec![(0, 1), (1, 2), (2, 3), (3, 4)]))
2523 /// # .into_keyed()
2524 /// # .batch(&tick, nondet!(/** test */));
2525 /// # let batch_second_tick = process
2526 /// # .source_iter(q!(vec![(0, 5), (1, 6), (2, 7)]))
2527 /// # .into_keyed()
2528 /// # .batch(&tick, nondet!(/** test */))
2529 /// # .defer_tick(); // appears on the second tick
2530 /// let input = batch_first_tick.chain(batch_second_tick).all_ticks();
2531 ///
2532 /// input.batch(&tick, nondet!(/** test */))
2533 /// .across_ticks(|s| s.reduce(q!(|sum, new| {
2534 /// *sum += new;
2535 /// }))).entries().all_ticks()
2536 /// # }, |mut stream| async move {
2537 /// // First tick: [(0, 1), (1, 2), (2, 3), (3, 4)]
2538 /// # let mut results = Vec::new();
2539 /// # for _ in 0..4 {
2540 /// # results.push(stream.next().await.unwrap());
2541 /// # }
2542 /// # results.sort();
2543 /// # assert_eq!(results, vec![(0, 1), (1, 2), (2, 3), (3, 4)]);
2544 /// // Second tick: [(0, 6), (1, 8), (2, 10), (3, 4)]
2545 /// # results.clear();
2546 /// # for _ in 0..4 {
2547 /// # results.push(stream.next().await.unwrap());
2548 /// # }
2549 /// # results.sort();
2550 /// # assert_eq!(results, vec![(0, 6), (1, 8), (2, 10), (3, 4)]);
2551 /// # }));
2552 /// # }
2553 /// ```
2554 pub fn across_ticks<Out: BatchAtomic>(
2555 self,
2556 thunk: impl FnOnce(KeyedStream<K, V, Atomic<L>, Unbounded, O, R>) -> Out,
2557 ) -> Out::Batched {
2558 thunk(self.all_ticks_atomic()).batched_atomic()
2559 }
2560
2561 /// Shifts the entries in `self` to the **next tick**, so that the returned keyed stream at
2562 /// tick `T` always has the entries of `self` at tick `T - 1`.
2563 ///
2564 /// At tick `0`, the output keyed stream is empty, since there is no previous tick.
2565 ///
2566 /// This operator enables stateful iterative processing with ticks, by sending data from one
2567 /// tick to the next. For example, you can use it to combine inputs across consecutive batches.
2568 ///
2569 /// # Example
2570 /// ```rust
2571 /// # #[cfg(feature = "deploy")] {
2572 /// # use hydro_lang::prelude::*;
2573 /// # use futures::StreamExt;
2574 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2575 /// let tick = process.tick();
2576 /// # // ticks are lazy by default, forces the second tick to run
2577 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2578 /// # let batch_first_tick = process
2579 /// # .source_iter(q!(vec![(1, 2), (1, 3)]))
2580 /// # .batch(&tick, nondet!(/** test */))
2581 /// # .into_keyed();
2582 /// # let batch_second_tick = process
2583 /// # .source_iter(q!(vec![(1, 4), (2, 5)]))
2584 /// # .batch(&tick, nondet!(/** test */))
2585 /// # .defer_tick()
2586 /// # .into_keyed(); // appears on the second tick
2587 /// let changes_across_ticks = // { 1: [2, 3] } (first tick), { 1: [4], 2: [5] } (second tick)
2588 /// # batch_first_tick.chain(batch_second_tick);
2589 /// changes_across_ticks.clone().defer_tick().chain( // from the previous tick
2590 /// changes_across_ticks // from the current tick
2591 /// )
2592 /// # .entries().all_ticks()
2593 /// # }, |mut stream| async move {
2594 /// // First tick: { 1: [2, 3] }
2595 /// # let mut results = Vec::new();
2596 /// # for _ in 0..2 {
2597 /// # results.push(stream.next().await.unwrap());
2598 /// # }
2599 /// # results.sort();
2600 /// # assert_eq!(results, vec![(1, 2), (1, 3)]);
2601 /// // Second tick: { 1: [2, 3, 4], 2: [5] }
2602 /// # results.clear();
2603 /// # for _ in 0..4 {
2604 /// # results.push(stream.next().await.unwrap());
2605 /// # }
2606 /// # results.sort();
2607 /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5)]);
2608 /// // Third tick: { 1: [4], 2: [5] }
2609 /// # results.clear();
2610 /// # for _ in 0..2 {
2611 /// # results.push(stream.next().await.unwrap());
2612 /// # }
2613 /// # results.sort();
2614 /// # assert_eq!(results, vec![(1, 4), (2, 5)]);
2615 /// # }));
2616 /// # }
2617 /// ```
2618 pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2619 KeyedStream::new(
2620 self.location.clone(),
2621 HydroNode::DeferTick {
2622 input: Box::new(self.ir_node.into_inner()),
2623 metadata: self.location.new_node_metadata(KeyedStream::<
2624 K,
2625 V,
2626 Tick<L>,
2627 Bounded,
2628 O,
2629 R,
2630 >::collection_kind()),
2631 },
2632 )
2633 }
2634}
2635
2636#[cfg(test)]
2637mod tests {
2638 #[cfg(feature = "deploy")]
2639 use futures::{SinkExt, StreamExt};
2640 #[cfg(feature = "deploy")]
2641 use hydro_deploy::Deployment;
2642 #[cfg(any(feature = "deploy", feature = "sim"))]
2643 use stageleft::q;
2644
2645 #[cfg(any(feature = "deploy", feature = "sim"))]
2646 use crate::compile::builder::FlowBuilder;
2647 #[cfg(feature = "deploy")]
2648 use crate::live_collections::stream::ExactlyOnce;
2649 #[cfg(feature = "sim")]
2650 use crate::live_collections::stream::{NoOrder, TotalOrder};
2651 #[cfg(any(feature = "deploy", feature = "sim"))]
2652 use crate::location::Location;
2653 #[cfg(any(feature = "deploy", feature = "sim"))]
2654 use crate::nondet::nondet;
2655 #[cfg(feature = "deploy")]
2656 use crate::properties::manual_proof;
2657
2658 #[cfg(feature = "deploy")]
2659 #[tokio::test]
2660 async fn reduce_watermark_filter() {
2661 let mut deployment = Deployment::new();
2662
2663 let mut flow = FlowBuilder::new();
2664 let node = flow.process::<()>();
2665 let external = flow.external::<()>();
2666
2667 let node_tick = node.tick();
2668 let watermark = node_tick.singleton(q!(1));
2669
2670 let sum = node
2671 .source_stream(q!(tokio_stream::iter([
2672 (0, 100),
2673 (1, 101),
2674 (2, 102),
2675 (2, 102)
2676 ])))
2677 .into_keyed()
2678 .reduce_watermark(
2679 watermark,
2680 q!(|acc, v| {
2681 *acc += v;
2682 }),
2683 )
2684 .snapshot(&node_tick, nondet!(/** test */))
2685 .entries()
2686 .all_ticks()
2687 .send_bincode_external(&external);
2688
2689 let nodes = flow
2690 .with_process(&node, deployment.Localhost())
2691 .with_external(&external, deployment.Localhost())
2692 .deploy(&mut deployment);
2693
2694 deployment.deploy().await.unwrap();
2695
2696 let mut out = nodes.connect(sum).await;
2697
2698 deployment.start().await.unwrap();
2699
2700 assert_eq!(out.next().await.unwrap(), (2, 204));
2701 }
2702
2703 #[cfg(feature = "deploy")]
2704 #[tokio::test]
2705 async fn reduce_watermark_bounded() {
2706 let mut deployment = Deployment::new();
2707
2708 let mut flow = FlowBuilder::new();
2709 let node = flow.process::<()>();
2710 let external = flow.external::<()>();
2711
2712 let node_tick = node.tick();
2713 let watermark = node_tick.singleton(q!(1));
2714
2715 let sum = node
2716 .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2717 .into_keyed()
2718 .reduce_watermark(
2719 watermark,
2720 q!(|acc, v| {
2721 *acc += v;
2722 }),
2723 )
2724 .entries()
2725 .send_bincode_external(&external);
2726
2727 let nodes = flow
2728 .with_process(&node, deployment.Localhost())
2729 .with_external(&external, deployment.Localhost())
2730 .deploy(&mut deployment);
2731
2732 deployment.deploy().await.unwrap();
2733
2734 let mut out = nodes.connect(sum).await;
2735
2736 deployment.start().await.unwrap();
2737
2738 assert_eq!(out.next().await.unwrap(), (2, 204));
2739 }
2740
2741 #[cfg(feature = "deploy")]
2742 #[tokio::test]
2743 async fn reduce_watermark_garbage_collect() {
2744 let mut deployment = Deployment::new();
2745
2746 let mut flow = FlowBuilder::new();
2747 let node = flow.process::<()>();
2748 let external = flow.external::<()>();
2749 let (tick_send, tick_trigger) =
2750 node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
2751
2752 let node_tick = node.tick();
2753 let (watermark_complete_cycle, watermark) =
2754 node_tick.cycle_with_initial(node_tick.singleton(q!(1)));
2755 let next_watermark = watermark.clone().map(q!(|v| v + 1));
2756 watermark_complete_cycle.complete_next_tick(next_watermark);
2757
2758 let tick_triggered_input = node_tick
2759 .singleton(q!((3, 103)))
2760 .into_stream()
2761 .filter_if_some(
2762 tick_trigger
2763 .clone()
2764 .batch(&node_tick, nondet!(/** test */))
2765 .first(),
2766 )
2767 .all_ticks();
2768
2769 let sum = node
2770 .source_stream(q!(tokio_stream::iter([
2771 (0, 100),
2772 (1, 101),
2773 (2, 102),
2774 (2, 102)
2775 ])))
2776 .interleave(tick_triggered_input)
2777 .into_keyed()
2778 .reduce_watermark(
2779 watermark,
2780 q!(
2781 |acc, v| {
2782 *acc += v;
2783 },
2784 commutative = manual_proof!(/** integer addition is commutative */)
2785 ),
2786 )
2787 .snapshot(&node_tick, nondet!(/** test */))
2788 .entries()
2789 .all_ticks()
2790 .send_bincode_external(&external);
2791
2792 let nodes = flow
2793 .with_default_optimize()
2794 .with_process(&node, deployment.Localhost())
2795 .with_external(&external, deployment.Localhost())
2796 .deploy(&mut deployment);
2797
2798 deployment.deploy().await.unwrap();
2799
2800 let mut tick_send = nodes.connect(tick_send).await;
2801 let mut out_recv = nodes.connect(sum).await;
2802
2803 deployment.start().await.unwrap();
2804
2805 assert_eq!(out_recv.next().await.unwrap(), (2, 204));
2806
2807 tick_send.send(()).await.unwrap();
2808
2809 assert_eq!(out_recv.next().await.unwrap(), (3, 103));
2810 }
2811
2812 #[cfg(feature = "sim")]
2813 #[test]
2814 #[should_panic]
2815 fn sim_batch_nondet_size() {
2816 let mut flow = FlowBuilder::new();
2817 let node = flow.process::<()>();
2818
2819 let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2820
2821 let tick = node.tick();
2822 let out_recv = input
2823 .batch(&tick, nondet!(/** test */))
2824 .fold(q!(|| vec![]), q!(|acc, v| acc.push(v)))
2825 .entries()
2826 .all_ticks()
2827 .sim_output();
2828
2829 flow.sim().exhaustive(async || {
2830 out_recv
2831 .assert_yields_only_unordered([(1, vec![1, 2])])
2832 .await;
2833 });
2834 }
2835
2836 #[cfg(feature = "sim")]
2837 #[test]
2838 fn sim_batch_preserves_group_order() {
2839 let mut flow = FlowBuilder::new();
2840 let node = flow.process::<()>();
2841
2842 let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2843
2844 let tick = node.tick();
2845 let out_recv = input
2846 .batch(&tick, nondet!(/** test */))
2847 .all_ticks()
2848 .fold_early_stop(
2849 q!(|| 0),
2850 q!(|acc, v| {
2851 *acc = std::cmp::max(v, *acc);
2852 *acc >= 2
2853 }),
2854 )
2855 .entries()
2856 .sim_output();
2857
2858 let instances = flow.sim().exhaustive(async || {
2859 out_recv
2860 .assert_yields_only_unordered([(1, 2), (2, 3)])
2861 .await;
2862 });
2863
2864 assert_eq!(instances, 8);
2865 // - three cases: all three in a separate tick (pick where (2, 3) is)
2866 // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after
2867 // - two cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them
2868 // - one case: all three together
2869 }
2870
2871 #[cfg(feature = "sim")]
2872 #[test]
2873 fn sim_batch_unordered_shuffles() {
2874 let mut flow = FlowBuilder::new();
2875 let node = flow.process::<()>();
2876
2877 let input = node
2878 .source_iter(q!([(1, 1), (1, 2), (2, 3)]))
2879 .into_keyed()
2880 .weaken_ordering::<NoOrder>();
2881
2882 let tick = node.tick();
2883 let out_recv = input
2884 .batch(&tick, nondet!(/** test */))
2885 .all_ticks()
2886 .entries()
2887 .sim_output();
2888
2889 let instances = flow.sim().exhaustive(async || {
2890 out_recv
2891 .assert_yields_only_unordered([(1, 1), (1, 2), (2, 3)])
2892 .await;
2893 });
2894
2895 assert_eq!(instances, 13);
2896 // - 6 (3 * 2) cases: all three in a separate tick (pick where (2, 3) is), and order of (1, 1), (1, 2)
2897 // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2898 // - 4 (2 * 2) cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them, and order of (1, 1), (1, 2)
2899 // - one case: all three together (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2900 }
2901
2902 #[cfg(feature = "sim")]
2903 #[test]
2904 #[should_panic]
2905 fn sim_observe_order_batched() {
2906 let mut flow = FlowBuilder::new();
2907 let node = flow.process::<()>();
2908
2909 let (port, input) = node.sim_input::<_, NoOrder, _>();
2910
2911 let tick = node.tick();
2912 let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
2913 let out_recv = batch
2914 .assume_ordering::<TotalOrder>(nondet!(/** test */))
2915 .all_ticks()
2916 .first()
2917 .entries()
2918 .sim_output();
2919
2920 flow.sim().exhaustive(async || {
2921 port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
2922 out_recv
2923 .assert_yields_only_unordered([(1, 1), (2, 1)])
2924 .await; // fails with assume_ordering
2925 });
2926 }
2927
2928 #[cfg(feature = "sim")]
2929 #[test]
2930 fn sim_observe_order_batched_count() {
2931 let mut flow = FlowBuilder::new();
2932 let node = flow.process::<()>();
2933
2934 let (port, input) = node.sim_input::<_, NoOrder, _>();
2935
2936 let tick = node.tick();
2937 let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
2938 let out_recv = batch
2939 .assume_ordering::<TotalOrder>(nondet!(/** test */))
2940 .all_ticks()
2941 .entries()
2942 .sim_output();
2943
2944 let instance_count = flow.sim().exhaustive(async || {
2945 port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
2946 let _ = out_recv.collect_sorted::<Vec<_>>().await;
2947 });
2948
2949 assert_eq!(instance_count, 104); // too complicated to enumerate here, but less than stream equivalent
2950 }
2951
2952 #[cfg(feature = "sim")]
2953 #[test]
2954 fn sim_top_level_assume_ordering() {
2955 use std::collections::HashMap;
2956
2957 let mut flow = FlowBuilder::new();
2958 let node = flow.process::<()>();
2959
2960 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2961
2962 let out_recv = input
2963 .into_keyed()
2964 .assume_ordering::<TotalOrder>(nondet!(/** test */))
2965 .fold_early_stop(
2966 q!(|| Vec::new()),
2967 q!(|acc, v| {
2968 acc.push(v);
2969 acc.len() >= 2
2970 }),
2971 )
2972 .entries()
2973 .sim_output();
2974
2975 let instance_count = flow.sim().exhaustive(async || {
2976 in_send.send_many_unordered([(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd')]);
2977 let out: HashMap<_, _> = out_recv
2978 .collect_sorted::<Vec<_>>()
2979 .await
2980 .into_iter()
2981 .collect();
2982 // Each key accumulates its values; we get one entry per key
2983 assert_eq!(out.len(), 2);
2984 });
2985
2986 assert_eq!(instance_count, 24)
2987 }
2988
2989 #[cfg(feature = "sim")]
2990 #[test]
2991 fn sim_top_level_assume_ordering_cycle_back() {
2992 use std::collections::HashMap;
2993
2994 let mut flow = FlowBuilder::new();
2995 let node = flow.process::<()>();
2996
2997 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2998
2999 let (complete_cycle_back, cycle_back) =
3000 node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3001 let ordered = input
3002 .into_keyed()
3003 .interleave(cycle_back)
3004 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3005 complete_cycle_back.complete(
3006 ordered
3007 .clone()
3008 .map(q!(|v| v + 1))
3009 .filter(q!(|v| v % 2 == 1)),
3010 );
3011
3012 let out_recv = ordered
3013 .fold_early_stop(
3014 q!(|| Vec::new()),
3015 q!(|acc, v| {
3016 acc.push(v);
3017 acc.len() >= 2
3018 }),
3019 )
3020 .entries()
3021 .sim_output();
3022
3023 let mut saw = false;
3024 let instance_count = flow.sim().exhaustive(async || {
3025 // Send (1, 0) and (1, 2). 0+1=1 is odd so cycles back.
3026 // We want to see [0, 1] - the cycled back value interleaved
3027 in_send.send_many_unordered([(1, 0), (1, 2)]);
3028 let out: HashMap<_, _> = out_recv
3029 .collect_sorted::<Vec<_>>()
3030 .await
3031 .into_iter()
3032 .collect();
3033
3034 // We want to see an instance where key 1 gets: 0, then 1 (cycled back from 0+1)
3035 if let Some(values) = out.get(&1)
3036 && *values == vec![0, 1]
3037 {
3038 saw = true;
3039 }
3040 });
3041
3042 assert!(
3043 saw,
3044 "did not see an instance with key 1 having [0, 1] in order"
3045 );
3046 assert_eq!(instance_count, 6);
3047 }
3048
3049 #[cfg(feature = "sim")]
3050 #[test]
3051 fn sim_top_level_assume_ordering_cross_key_cycle() {
3052 use std::collections::HashMap;
3053
3054 // This test demonstrates why releasing one entry at a time is important:
3055 // When one key's observed order cycles back into a different key, we need
3056 // to be able to interleave the cycled-back entry with pending items for
3057 // that other key.
3058 let mut flow = FlowBuilder::new();
3059 let node = flow.process::<()>();
3060
3061 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3062
3063 let (complete_cycle_back, cycle_back) =
3064 node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3065 let ordered = input
3066 .into_keyed()
3067 .interleave(cycle_back)
3068 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3069
3070 // Cycle back: when we see (1, 10), emit (2, 100) to key 2
3071 complete_cycle_back.complete(
3072 ordered
3073 .clone()
3074 .filter(q!(|v| *v == 10))
3075 .map(q!(|_| 100))
3076 .entries()
3077 .map(q!(|(_, v)| (2, v))) // Change key from 1 to 2
3078 .into_keyed(),
3079 );
3080
3081 let out_recv = ordered
3082 .fold_early_stop(
3083 q!(|| Vec::new()),
3084 q!(|acc, v| {
3085 acc.push(v);
3086 acc.len() >= 2
3087 }),
3088 )
3089 .entries()
3090 .sim_output();
3091
3092 // We want to see an instance where:
3093 // - (1, 10) is released first
3094 // - This causes (2, 100) to be cycled back
3095 // - (2, 100) is released BEFORE (2, 20) which was already pending
3096 let mut saw_cross_key_interleave = false;
3097 let instance_count = flow.sim().exhaustive(async || {
3098 // Send (1, 10), (1, 11) for key 1, and (2, 20), (2, 21) for key 2
3099 in_send.send_many_unordered([(1, 10), (1, 11), (2, 20), (2, 21)]);
3100 let out: HashMap<_, _> = out_recv
3101 .collect_sorted::<Vec<_>>()
3102 .await
3103 .into_iter()
3104 .collect();
3105
3106 // Check if we see the cross-key interleaving:
3107 // key 2 should have [100, 20] or [100, 21] - cycled back 100 before a pending item
3108 if let Some(values) = out.get(&2)
3109 && values.len() >= 2
3110 && values[0] == 100
3111 {
3112 saw_cross_key_interleave = true;
3113 }
3114 });
3115
3116 assert!(
3117 saw_cross_key_interleave,
3118 "did not see an instance where cycled-back 100 was released before pending items for key 2"
3119 );
3120 assert_eq!(instance_count, 60);
3121 }
3122
3123 #[cfg(feature = "sim")]
3124 #[test]
3125 fn sim_top_level_assume_ordering_cycle_back_tick() {
3126 use std::collections::HashMap;
3127
3128 let mut flow = FlowBuilder::new();
3129 let node = flow.process::<()>();
3130
3131 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3132
3133 let (complete_cycle_back, cycle_back) =
3134 node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3135 let ordered = input
3136 .into_keyed()
3137 .interleave(cycle_back)
3138 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3139 complete_cycle_back.complete(
3140 ordered
3141 .clone()
3142 .batch(&node.tick(), nondet!(/** test */))
3143 .all_ticks()
3144 .map(q!(|v| v + 1))
3145 .filter(q!(|v| v % 2 == 1)),
3146 );
3147
3148 let out_recv = ordered
3149 .fold_early_stop(
3150 q!(|| Vec::new()),
3151 q!(|acc, v| {
3152 acc.push(v);
3153 acc.len() >= 2
3154 }),
3155 )
3156 .entries()
3157 .sim_output();
3158
3159 let mut saw = false;
3160 let instance_count = flow.sim().exhaustive(async || {
3161 in_send.send_many_unordered([(1, 0), (1, 2)]);
3162 let out: HashMap<_, _> = out_recv
3163 .collect_sorted::<Vec<_>>()
3164 .await
3165 .into_iter()
3166 .collect();
3167
3168 if let Some(values) = out.get(&1)
3169 && *values == vec![0, 1]
3170 {
3171 saw = true;
3172 }
3173 });
3174
3175 assert!(
3176 saw,
3177 "did not see an instance with key 1 having [0, 1] in order"
3178 );
3179 assert_eq!(instance_count, 58);
3180 }
3181}