hydro_lang/live_collections/keyed_stream/mod.rs
1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
11
12use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
13use super::keyed_singleton::KeyedSingleton;
14use super::optional::Optional;
15use super::stream::{
16 ExactlyOnce, IsExactlyOnce, IsOrdered, MinOrder, MinRetries, NoOrder, Stream, TotalOrder,
17};
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::keyed_singleton::KeyedSingletonBound;
27use crate::live_collections::stream::{
28 AtLeastOnce, Ordering, Retries, WeakerOrderingThan, WeakerRetryThan,
29};
30#[cfg(stageleft_runtime)]
31use crate::location::dynamic::{DynLocation, LocationId};
32use crate::location::tick::DeferTick;
33use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
34use crate::manual_expr::ManualExpr;
35use crate::nondet::{NonDet, nondet};
36use crate::properties::{
37 AggFuncAlgebra, ApplyMonotoneKeyedStream, ValidCommutativityFor, ValidIdempotenceFor,
38 manual_proof,
39};
40
41pub mod networking;
42
43/// Streaming elements of type `V` grouped by a key of type `K`.
44///
45/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
46/// order of keys is non-deterministic but the order *within* each group may be deterministic.
47///
48/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
49/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
50/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
51///
52/// Type Parameters:
53/// - `K`: the type of the key for each group
54/// - `V`: the type of the elements inside each group
55/// - `Loc`: the [`Location`] where the keyed stream is materialized
56/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
57/// - `Order`: tracks whether the elements within each group have deterministic order
58/// ([`TotalOrder`]) or not ([`NoOrder`])
59/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
60/// ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
61pub struct KeyedStream<
62 K,
63 V,
64 Loc,
65 Bound: Boundedness = Unbounded,
66 Order: Ordering = TotalOrder,
67 Retry: Retries = ExactlyOnce,
68> {
69 pub(crate) location: Loc,
70 pub(crate) ir_node: RefCell<HydroNode>,
71 pub(crate) flow_state: FlowState,
72
73 _phantom: PhantomData<(K, V, Loc, Bound, Order, Retry)>,
74}
75
76impl<K, V, L, B: Boundedness, O: Ordering, R: Retries> Drop for KeyedStream<K, V, L, B, O, R> {
77 fn drop(&mut self) {
78 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
79 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
80 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
81 input: Box::new(ir_node),
82 op_metadata: HydroIrOpMetadata::new(),
83 });
84 }
85 }
86}
87
88impl<'a, K, V, L, O: Ordering, R: Retries> From<KeyedStream<K, V, L, Bounded, O, R>>
89 for KeyedStream<K, V, L, Unbounded, O, R>
90where
91 L: Location<'a>,
92{
93 fn from(stream: KeyedStream<K, V, L, Bounded, O, R>) -> KeyedStream<K, V, L, Unbounded, O, R> {
94 let new_meta = stream
95 .location
96 .new_node_metadata(KeyedStream::<K, V, L, Unbounded, O, R>::collection_kind());
97
98 KeyedStream {
99 location: stream.location.clone(),
100 flow_state: stream.flow_state.clone(),
101 ir_node: RefCell::new(HydroNode::Cast {
102 inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
103 metadata: new_meta,
104 }),
105 _phantom: PhantomData,
106 }
107 }
108}
109
110impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
111 for KeyedStream<K, V, L, B, NoOrder, R>
112where
113 L: Location<'a>,
114{
115 fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
116 stream.weaken_ordering()
117 }
118}
119
120impl<'a, K, V, L, O: Ordering, R: Retries> DeferTick for KeyedStream<K, V, Tick<L>, Bounded, O, R>
121where
122 L: Location<'a>,
123{
124 fn defer_tick(self) -> Self {
125 KeyedStream::defer_tick(self)
126 }
127}
128
129impl<'a, K, V, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
130 for KeyedStream<K, V, Tick<L>, Bounded, O, R>
131where
132 L: Location<'a>,
133{
134 type Location = Tick<L>;
135
136 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
137 KeyedStream {
138 flow_state: location.flow_state().clone(),
139 location: location.clone(),
140 ir_node: RefCell::new(HydroNode::CycleSource {
141 cycle_id,
142 metadata: location.new_node_metadata(
143 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
144 ),
145 }),
146 _phantom: PhantomData,
147 }
148 }
149}
150
151impl<'a, K, V, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
152 for KeyedStream<K, V, Tick<L>, Bounded, O, R>
153where
154 L: Location<'a>,
155{
156 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
157 assert_eq!(
158 Location::id(&self.location),
159 expected_location,
160 "locations do not match"
161 );
162
163 self.location
164 .flow_state()
165 .borrow_mut()
166 .push_root(HydroRoot::CycleSink {
167 cycle_id,
168 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
169 op_metadata: HydroIrOpMetadata::new(),
170 });
171 }
172}
173
174impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
175 for KeyedStream<K, V, L, B, O, R>
176where
177 L: Location<'a> + NoTick,
178{
179 type Location = L;
180
181 fn create_source(cycle_id: CycleId, location: L) -> Self {
182 KeyedStream {
183 flow_state: location.flow_state().clone(),
184 location: location.clone(),
185 ir_node: RefCell::new(HydroNode::CycleSource {
186 cycle_id,
187 metadata: location
188 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
189 }),
190 _phantom: PhantomData,
191 }
192 }
193}
194
195impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
196 for KeyedStream<K, V, L, B, O, R>
197where
198 L: Location<'a> + NoTick,
199{
200 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
201 assert_eq!(
202 Location::id(&self.location),
203 expected_location,
204 "locations do not match"
205 );
206 self.location
207 .flow_state()
208 .borrow_mut()
209 .push_root(HydroRoot::CycleSink {
210 cycle_id,
211 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
212 op_metadata: HydroIrOpMetadata::new(),
213 });
214 }
215}
216
217impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
218 Clone for KeyedStream<K, V, Loc, Bound, Order, R>
219{
220 fn clone(&self) -> Self {
221 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
222 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
223 *self.ir_node.borrow_mut() = HydroNode::Tee {
224 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
225 metadata: self.location.new_node_metadata(Self::collection_kind()),
226 };
227 }
228
229 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
230 KeyedStream {
231 location: self.location.clone(),
232 flow_state: self.flow_state.clone(),
233 ir_node: HydroNode::Tee {
234 inner: SharedNode(inner.0.clone()),
235 metadata: metadata.clone(),
236 }
237 .into(),
238 _phantom: PhantomData,
239 }
240 } else {
241 unreachable!()
242 }
243 }
244}
245
246/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
247/// control the processing of future elements.
248pub enum Generate<T> {
249 /// Emit the provided element, and keep processing future inputs.
250 Yield(T),
251 /// Emit the provided element as the _final_ element, do not process future inputs.
252 Return(T),
253 /// Do not emit anything, but continue processing future inputs.
254 Continue,
255 /// Do not emit anything, and do not process further inputs.
256 Break,
257}
258
259impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
260 KeyedStream<K, V, L, B, O, R>
261{
262 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
263 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
264 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
265
266 let flow_state = location.flow_state().clone();
267 KeyedStream {
268 location,
269 flow_state,
270 ir_node: RefCell::new(ir_node),
271 _phantom: PhantomData,
272 }
273 }
274
275 /// Returns the [`CollectionKind`] corresponding to this type.
276 pub fn collection_kind() -> CollectionKind {
277 CollectionKind::KeyedStream {
278 bound: B::BOUND_KIND,
279 value_order: O::ORDERING_KIND,
280 value_retry: R::RETRIES_KIND,
281 key_type: stageleft::quote_type::<K>().into(),
282 value_type: stageleft::quote_type::<V>().into(),
283 }
284 }
285
286 /// Returns the [`Location`] where this keyed stream is being materialized.
287 pub fn location(&self) -> &L {
288 &self.location
289 }
290
291 /// Explicitly "casts" the keyed stream to a type with a different ordering
292 /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
293 /// by the type-system.
294 ///
295 /// # Non-Determinism
296 /// This function is used as an escape hatch, and any mistakes in the
297 /// provided ordering guarantee will propagate into the guarantees
298 /// for the rest of the program.
299 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
300 if O::ORDERING_KIND == O2::ORDERING_KIND {
301 KeyedStream::new(
302 self.location.clone(),
303 self.ir_node.replace(HydroNode::Placeholder),
304 )
305 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
306 // We can always weaken the ordering guarantee
307 KeyedStream::new(
308 self.location.clone(),
309 HydroNode::Cast {
310 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
311 metadata: self
312 .location
313 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
314 },
315 )
316 } else {
317 KeyedStream::new(
318 self.location.clone(),
319 HydroNode::ObserveNonDet {
320 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
321 trusted: false,
322 metadata: self
323 .location
324 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
325 },
326 )
327 }
328 }
329
330 fn assume_ordering_trusted<O2: Ordering>(
331 self,
332 _nondet: NonDet,
333 ) -> KeyedStream<K, V, L, B, O2, R> {
334 if O::ORDERING_KIND == O2::ORDERING_KIND {
335 KeyedStream::new(
336 self.location.clone(),
337 self.ir_node.replace(HydroNode::Placeholder),
338 )
339 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
340 // We can always weaken the ordering guarantee
341 KeyedStream::new(
342 self.location.clone(),
343 HydroNode::Cast {
344 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
345 metadata: self
346 .location
347 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
348 },
349 )
350 } else {
351 KeyedStream::new(
352 self.location.clone(),
353 HydroNode::ObserveNonDet {
354 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
355 trusted: true,
356 metadata: self
357 .location
358 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
359 },
360 )
361 }
362 }
363
364 #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
365 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
366 /// which is always safe because that is the weakest possible guarantee.
367 pub fn weakest_ordering(self) -> KeyedStream<K, V, L, B, NoOrder, R> {
368 self.weaken_ordering::<NoOrder>()
369 }
370
371 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
372 /// enforcing that `O2` is weaker than the input ordering guarantee.
373 pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> KeyedStream<K, V, L, B, O2, R> {
374 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
375 self.assume_ordering::<O2>(nondet)
376 }
377
378 /// Explicitly "casts" the keyed stream to a type with a different retries
379 /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
380 /// be proven by the type-system.
381 ///
382 /// # Non-Determinism
383 /// This function is used as an escape hatch, and any mistakes in the
384 /// provided retries guarantee will propagate into the guarantees
385 /// for the rest of the program.
386 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
387 if R::RETRIES_KIND == R2::RETRIES_KIND {
388 KeyedStream::new(
389 self.location.clone(),
390 self.ir_node.replace(HydroNode::Placeholder),
391 )
392 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
393 // We can always weaken the retries guarantee
394 KeyedStream::new(
395 self.location.clone(),
396 HydroNode::Cast {
397 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
398 metadata: self
399 .location
400 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
401 },
402 )
403 } else {
404 KeyedStream::new(
405 self.location.clone(),
406 HydroNode::ObserveNonDet {
407 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
408 trusted: false,
409 metadata: self
410 .location
411 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
412 },
413 )
414 }
415 }
416
417 #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
418 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
419 /// which is always safe because that is the weakest possible guarantee.
420 pub fn weakest_retries(self) -> KeyedStream<K, V, L, B, O, AtLeastOnce> {
421 self.weaken_retries::<AtLeastOnce>()
422 }
423
424 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
425 /// enforcing that `R2` is weaker than the input retries guarantee.
426 pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> KeyedStream<K, V, L, B, O, R2> {
427 let nondet = nondet!(/** this is a weaker retries guarantee, so it is safe to assume */);
428 self.assume_retries::<R2>(nondet)
429 }
430
431 /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
432 /// implies that `O == TotalOrder`.
433 pub fn make_totally_ordered(self) -> KeyedStream<K, V, L, B, TotalOrder, R>
434 where
435 O: IsOrdered,
436 {
437 self.assume_ordering(nondet!(/** no-op */))
438 }
439
440 /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
441 /// implies that `R == ExactlyOnce`.
442 pub fn make_exactly_once(self) -> KeyedStream<K, V, L, B, O, ExactlyOnce>
443 where
444 R: IsExactlyOnce,
445 {
446 self.assume_retries(nondet!(/** no-op */))
447 }
448
449 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
450 /// implies that `B == Bounded`.
451 pub fn make_bounded(self) -> KeyedStream<K, V, L, Bounded, O, R>
452 where
453 B: IsBounded,
454 {
455 KeyedStream::new(
456 self.location.clone(),
457 self.ir_node.replace(HydroNode::Placeholder),
458 )
459 }
460
461 /// Flattens the keyed stream into an unordered stream of key-value pairs.
462 ///
463 /// # Example
464 /// ```rust
465 /// # #[cfg(feature = "deploy")] {
466 /// # use hydro_lang::prelude::*;
467 /// # use futures::StreamExt;
468 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
469 /// process
470 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
471 /// .into_keyed()
472 /// .entries()
473 /// # }, |mut stream| async move {
474 /// // (1, 2), (1, 3), (2, 4) in any order
475 /// # let mut results = Vec::new();
476 /// # for _ in 0..3 {
477 /// # results.push(stream.next().await.unwrap());
478 /// # }
479 /// # results.sort();
480 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
481 /// # }));
482 /// # }
483 /// ```
484 pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
485 Stream::new(
486 self.location.clone(),
487 HydroNode::Cast {
488 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
489 metadata: self
490 .location
491 .new_node_metadata(Stream::<(K, V), L, B, NoOrder, R>::collection_kind()),
492 },
493 )
494 }
495
496 /// Flattens the keyed stream into an unordered stream of only the values.
497 ///
498 /// # Example
499 /// ```rust
500 /// # #[cfg(feature = "deploy")] {
501 /// # use hydro_lang::prelude::*;
502 /// # use futures::StreamExt;
503 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
504 /// process
505 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
506 /// .into_keyed()
507 /// .values()
508 /// # }, |mut stream| async move {
509 /// // 2, 3, 4 in any order
510 /// # let mut results = Vec::new();
511 /// # for _ in 0..3 {
512 /// # results.push(stream.next().await.unwrap());
513 /// # }
514 /// # results.sort();
515 /// # assert_eq!(results, vec![2, 3, 4]);
516 /// # }));
517 /// # }
518 /// ```
519 pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
520 self.entries().map(q!(|(_, v)| v))
521 }
522
523 /// Flattens the keyed stream into an unordered stream of just the keys.
524 ///
525 /// # Example
526 /// ```rust
527 /// # #[cfg(feature = "deploy")] {
528 /// # use hydro_lang::prelude::*;
529 /// # use futures::StreamExt;
530 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
531 /// # process
532 /// # .source_iter(q!(vec![(1, 2), (2, 4), (1, 5)]))
533 /// # .into_keyed()
534 /// # .keys()
535 /// # }, |mut stream| async move {
536 /// // 1, 2 in any order
537 /// # let mut results = Vec::new();
538 /// # for _ in 0..2 {
539 /// # results.push(stream.next().await.unwrap());
540 /// # }
541 /// # results.sort();
542 /// # assert_eq!(results, vec![1, 2]);
543 /// # }));
544 /// # }
545 /// ```
546 pub fn keys(self) -> Stream<K, L, B, NoOrder, ExactlyOnce>
547 where
548 K: Eq + Hash,
549 {
550 self.entries().map(q!(|(k, _)| k)).unique()
551 }
552
553 /// Transforms each value by invoking `f` on each element, with keys staying the same
554 /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
555 ///
556 /// If you do not want to modify the stream and instead only want to view
557 /// each item use [`KeyedStream::inspect`] instead.
558 ///
559 /// # Example
560 /// ```rust
561 /// # #[cfg(feature = "deploy")] {
562 /// # use hydro_lang::prelude::*;
563 /// # use futures::StreamExt;
564 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
565 /// process
566 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
567 /// .into_keyed()
568 /// .map(q!(|v| v + 1))
569 /// # .entries()
570 /// # }, |mut stream| async move {
571 /// // { 1: [3, 4], 2: [5] }
572 /// # let mut results = Vec::new();
573 /// # for _ in 0..3 {
574 /// # results.push(stream.next().await.unwrap());
575 /// # }
576 /// # results.sort();
577 /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 5)]);
578 /// # }));
579 /// # }
580 /// ```
581 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
582 where
583 F: Fn(V) -> U + 'a,
584 {
585 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
586 let map_f = q!({
587 let orig = f;
588 move |(k, v)| (k, orig(v))
589 })
590 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
591 .into();
592
593 KeyedStream::new(
594 self.location.clone(),
595 HydroNode::Map {
596 f: map_f,
597 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
598 metadata: self
599 .location
600 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
601 },
602 )
603 }
604
605 /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
606 /// re-grouped even they are tuples; instead they will be grouped under the original key.
607 ///
608 /// If you do not want to modify the stream and instead only want to view
609 /// each item use [`KeyedStream::inspect_with_key`] instead.
610 ///
611 /// # Example
612 /// ```rust
613 /// # #[cfg(feature = "deploy")] {
614 /// # use hydro_lang::prelude::*;
615 /// # use futures::StreamExt;
616 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
617 /// process
618 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
619 /// .into_keyed()
620 /// .map_with_key(q!(|(k, v)| k + v))
621 /// # .entries()
622 /// # }, |mut stream| async move {
623 /// // { 1: [3, 4], 2: [6] }
624 /// # let mut results = Vec::new();
625 /// # for _ in 0..3 {
626 /// # results.push(stream.next().await.unwrap());
627 /// # }
628 /// # results.sort();
629 /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 6)]);
630 /// # }));
631 /// # }
632 /// ```
633 pub fn map_with_key<U, F>(
634 self,
635 f: impl IntoQuotedMut<'a, F, L> + Copy,
636 ) -> KeyedStream<K, U, L, B, O, R>
637 where
638 F: Fn((K, V)) -> U + 'a,
639 K: Clone,
640 {
641 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
642 let map_f = q!({
643 let orig = f;
644 move |(k, v)| {
645 let out = orig((Clone::clone(&k), v));
646 (k, out)
647 }
648 })
649 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
650 .into();
651
652 KeyedStream::new(
653 self.location.clone(),
654 HydroNode::Map {
655 f: map_f,
656 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
657 metadata: self
658 .location
659 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
660 },
661 )
662 }
663
664 /// Prepends a new value to the key of each element in the stream, producing a new
665 /// keyed stream with compound keys. Because the original key is preserved, no re-grouping
666 /// occurs and the elements in each group preserve their original order.
667 ///
668 /// # Example
669 /// ```rust
670 /// # #[cfg(feature = "deploy")] {
671 /// # use hydro_lang::prelude::*;
672 /// # use futures::StreamExt;
673 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
674 /// process
675 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
676 /// .into_keyed()
677 /// .prefix_key(q!(|&(k, _)| k % 2))
678 /// # .entries()
679 /// # }, |mut stream| async move {
680 /// // { (1, 1): [2, 3], (0, 2): [4] }
681 /// # let mut results = Vec::new();
682 /// # for _ in 0..3 {
683 /// # results.push(stream.next().await.unwrap());
684 /// # }
685 /// # results.sort();
686 /// # assert_eq!(results, vec![((0, 2), 4), ((1, 1), 2), ((1, 1), 3)]);
687 /// # }));
688 /// # }
689 /// ```
690 pub fn prefix_key<K2, F>(
691 self,
692 f: impl IntoQuotedMut<'a, F, L> + Copy,
693 ) -> KeyedStream<(K2, K), V, L, B, O, R>
694 where
695 F: Fn(&(K, V)) -> K2 + 'a,
696 {
697 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
698 let map_f = q!({
699 let orig = f;
700 move |kv| {
701 let out = orig(&kv);
702 ((out, kv.0), kv.1)
703 }
704 })
705 .splice_fn1_ctx::<(K, V), ((K2, K), V)>(&self.location)
706 .into();
707
708 KeyedStream::new(
709 self.location.clone(),
710 HydroNode::Map {
711 f: map_f,
712 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
713 metadata: self
714 .location
715 .new_node_metadata(KeyedStream::<(K2, K), V, L, B, O, R>::collection_kind()),
716 },
717 )
718 }
719
720 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
721 /// `f`, preserving the order of the elements within the group.
722 ///
723 /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
724 /// not modify or take ownership of the values. If you need to modify the values while filtering
725 /// use [`KeyedStream::filter_map`] instead.
726 ///
727 /// # Example
728 /// ```rust
729 /// # #[cfg(feature = "deploy")] {
730 /// # use hydro_lang::prelude::*;
731 /// # use futures::StreamExt;
732 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
733 /// process
734 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
735 /// .into_keyed()
736 /// .filter(q!(|&x| x > 2))
737 /// # .entries()
738 /// # }, |mut stream| async move {
739 /// // { 1: [3], 2: [4] }
740 /// # let mut results = Vec::new();
741 /// # for _ in 0..2 {
742 /// # results.push(stream.next().await.unwrap());
743 /// # }
744 /// # results.sort();
745 /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
746 /// # }));
747 /// # }
748 /// ```
749 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
750 where
751 F: Fn(&V) -> bool + 'a,
752 {
753 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
754 let filter_f = q!({
755 let orig = f;
756 move |t: &(_, _)| orig(&t.1)
757 })
758 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
759 .into();
760
761 KeyedStream::new(
762 self.location.clone(),
763 HydroNode::Filter {
764 f: filter_f,
765 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
766 metadata: self.location.new_node_metadata(Self::collection_kind()),
767 },
768 )
769 }
770
771 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
772 /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
773 ///
774 /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
775 /// not modify or take ownership of the values. If you need to modify the values while filtering
776 /// use [`KeyedStream::filter_map_with_key`] instead.
777 ///
778 /// # Example
779 /// ```rust
780 /// # #[cfg(feature = "deploy")] {
781 /// # use hydro_lang::prelude::*;
782 /// # use futures::StreamExt;
783 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
784 /// process
785 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
786 /// .into_keyed()
787 /// .filter_with_key(q!(|&(k, v)| v - k == 2))
788 /// # .entries()
789 /// # }, |mut stream| async move {
790 /// // { 1: [3], 2: [4] }
791 /// # let mut results = Vec::new();
792 /// # for _ in 0..2 {
793 /// # results.push(stream.next().await.unwrap());
794 /// # }
795 /// # results.sort();
796 /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
797 /// # }));
798 /// # }
799 /// ```
800 pub fn filter_with_key<F>(
801 self,
802 f: impl IntoQuotedMut<'a, F, L> + Copy,
803 ) -> KeyedStream<K, V, L, B, O, R>
804 where
805 F: Fn(&(K, V)) -> bool + 'a,
806 {
807 let filter_f = f
808 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
809 .into();
810
811 KeyedStream::new(
812 self.location.clone(),
813 HydroNode::Filter {
814 f: filter_f,
815 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
816 metadata: self.location.new_node_metadata(Self::collection_kind()),
817 },
818 )
819 }
820
821 /// An operator that both filters and maps each value, with keys staying the same.
822 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
823 /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
824 ///
825 /// # Example
826 /// ```rust
827 /// # #[cfg(feature = "deploy")] {
828 /// # use hydro_lang::prelude::*;
829 /// # use futures::StreamExt;
830 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
831 /// process
832 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
833 /// .into_keyed()
834 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
835 /// # .entries()
836 /// # }, |mut stream| async move {
837 /// // { 1: [2], 2: [4] }
838 /// # let mut results = Vec::new();
839 /// # for _ in 0..2 {
840 /// # results.push(stream.next().await.unwrap());
841 /// # }
842 /// # results.sort();
843 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
844 /// # }));
845 /// # }
846 /// ```
847 pub fn filter_map<U, F>(
848 self,
849 f: impl IntoQuotedMut<'a, F, L> + Copy,
850 ) -> KeyedStream<K, U, L, B, O, R>
851 where
852 F: Fn(V) -> Option<U> + 'a,
853 {
854 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
855 let filter_map_f = q!({
856 let orig = f;
857 move |(k, v)| orig(v).map(|o| (k, o))
858 })
859 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
860 .into();
861
862 KeyedStream::new(
863 self.location.clone(),
864 HydroNode::FilterMap {
865 f: filter_map_f,
866 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
867 metadata: self
868 .location
869 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
870 },
871 )
872 }
873
874 /// An operator that both filters and maps each key-value pair. The resulting values are **not**
875 /// re-grouped even they are tuples; instead they will be grouped under the original key.
876 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
877 ///
878 /// # Example
879 /// ```rust
880 /// # #[cfg(feature = "deploy")] {
881 /// # use hydro_lang::prelude::*;
882 /// # use futures::StreamExt;
883 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
884 /// process
885 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
886 /// .into_keyed()
887 /// .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
888 /// # .entries()
889 /// # }, |mut stream| async move {
890 /// // { 2: [2] }
891 /// # let mut results = Vec::new();
892 /// # for _ in 0..1 {
893 /// # results.push(stream.next().await.unwrap());
894 /// # }
895 /// # results.sort();
896 /// # assert_eq!(results, vec![(2, 2)]);
897 /// # }));
898 /// # }
899 /// ```
900 pub fn filter_map_with_key<U, F>(
901 self,
902 f: impl IntoQuotedMut<'a, F, L> + Copy,
903 ) -> KeyedStream<K, U, L, B, O, R>
904 where
905 F: Fn((K, V)) -> Option<U> + 'a,
906 K: Clone,
907 {
908 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
909 let filter_map_f = q!({
910 let orig = f;
911 move |(k, v)| {
912 let out = orig((Clone::clone(&k), v));
913 out.map(|o| (k, o))
914 }
915 })
916 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
917 .into();
918
919 KeyedStream::new(
920 self.location.clone(),
921 HydroNode::FilterMap {
922 f: filter_map_f,
923 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
924 metadata: self
925 .location
926 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
927 },
928 )
929 }
930
931 /// Generates a keyed stream that maps each value `v` to a tuple `(v, x)`,
932 /// where `v` is the value of `other`, a bounded [`super::singleton::Singleton`] or
933 /// [`Optional`]. If `other` is an empty [`Optional`], no values will be produced.
934 ///
935 /// # Example
936 /// ```rust
937 /// # #[cfg(feature = "deploy")] {
938 /// # use hydro_lang::prelude::*;
939 /// # use futures::StreamExt;
940 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
941 /// let tick = process.tick();
942 /// let batch = process
943 /// .source_iter(q!(vec![(1, 123), (1, 456), (2, 123)]))
944 /// .into_keyed()
945 /// .batch(&tick, nondet!(/** test */));
946 /// let count = batch.clone().entries().count(); // `count()` returns a singleton
947 /// batch.cross_singleton(count).all_ticks().entries()
948 /// # }, |mut stream| async move {
949 /// // { 1: [(123, 3), (456, 3)], 2: [(123, 3)] }
950 /// # let mut results = Vec::new();
951 /// # for _ in 0..3 {
952 /// # results.push(stream.next().await.unwrap());
953 /// # }
954 /// # results.sort();
955 /// # assert_eq!(results, vec![(1, (123, 3)), (1, (456, 3)), (2, (123, 3))]);
956 /// # }));
957 /// # }
958 /// ```
959 pub fn cross_singleton<O2>(
960 self,
961 other: impl Into<Optional<O2, L, Bounded>>,
962 ) -> KeyedStream<K, (V, O2), L, B, O, R>
963 where
964 O2: Clone,
965 {
966 let other: Optional<O2, L, Bounded> = other.into();
967 check_matching_location(&self.location, &other.location);
968
969 Stream::new(
970 self.location.clone(),
971 HydroNode::CrossSingleton {
972 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
973 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
974 metadata: self
975 .location
976 .new_node_metadata(Stream::<((K, V), O2), L, B, O, R>::collection_kind()),
977 },
978 )
979 .map(q!(|((k, v), o2)| (k, (v, o2))))
980 .into_keyed()
981 }
982
983 /// For each value `v` in each group, transform `v` using `f` and then treat the
984 /// result as an [`Iterator`] to produce values one by one within the same group.
985 /// The implementation for [`Iterator`] for the output type `I` must produce items
986 /// in a **deterministic** order.
987 ///
988 /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
989 /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
990 ///
991 /// # Example
992 /// ```rust
993 /// # #[cfg(feature = "deploy")] {
994 /// # use hydro_lang::prelude::*;
995 /// # use futures::StreamExt;
996 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
997 /// process
998 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
999 /// .into_keyed()
1000 /// .flat_map_ordered(q!(|x| x))
1001 /// # .entries()
1002 /// # }, |mut stream| async move {
1003 /// // { 1: [2, 3, 4], 2: [5, 6] }
1004 /// # let mut results = Vec::new();
1005 /// # for _ in 0..5 {
1006 /// # results.push(stream.next().await.unwrap());
1007 /// # }
1008 /// # results.sort();
1009 /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1010 /// # }));
1011 /// # }
1012 /// ```
1013 pub fn flat_map_ordered<U, I, F>(
1014 self,
1015 f: impl IntoQuotedMut<'a, F, L> + Copy,
1016 ) -> KeyedStream<K, U, L, B, O, R>
1017 where
1018 I: IntoIterator<Item = U>,
1019 F: Fn(V) -> I + 'a,
1020 K: Clone,
1021 {
1022 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1023 let flat_map_f = q!({
1024 let orig = f;
1025 move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1026 })
1027 .splice_fn1_ctx::<(K, V), _>(&self.location)
1028 .into();
1029
1030 KeyedStream::new(
1031 self.location.clone(),
1032 HydroNode::FlatMap {
1033 f: flat_map_f,
1034 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1035 metadata: self
1036 .location
1037 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
1038 },
1039 )
1040 }
1041
1042 /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
1043 /// for the output type `I` to produce items in any order.
1044 ///
1045 /// # Example
1046 /// ```rust
1047 /// # #[cfg(feature = "deploy")] {
1048 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1049 /// # use futures::StreamExt;
1050 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1051 /// process
1052 /// .source_iter(q!(vec![
1053 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1054 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
1055 /// ]))
1056 /// .into_keyed()
1057 /// .flat_map_unordered(q!(|x| x))
1058 /// # .entries()
1059 /// # }, |mut stream| async move {
1060 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1061 /// # let mut results = Vec::new();
1062 /// # for _ in 0..4 {
1063 /// # results.push(stream.next().await.unwrap());
1064 /// # }
1065 /// # results.sort();
1066 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1067 /// # }));
1068 /// # }
1069 /// ```
1070 pub fn flat_map_unordered<U, I, F>(
1071 self,
1072 f: impl IntoQuotedMut<'a, F, L> + Copy,
1073 ) -> KeyedStream<K, U, L, B, NoOrder, R>
1074 where
1075 I: IntoIterator<Item = U>,
1076 F: Fn(V) -> I + 'a,
1077 K: Clone,
1078 {
1079 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1080 let flat_map_f = q!({
1081 let orig = f;
1082 move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1083 })
1084 .splice_fn1_ctx::<(K, V), _>(&self.location)
1085 .into();
1086
1087 KeyedStream::new(
1088 self.location.clone(),
1089 HydroNode::FlatMap {
1090 f: flat_map_f,
1091 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1092 metadata: self
1093 .location
1094 .new_node_metadata(KeyedStream::<K, U, L, B, NoOrder, R>::collection_kind()),
1095 },
1096 )
1097 }
1098
1099 /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
1100 /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
1101 /// items in a **deterministic** order.
1102 ///
1103 /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
1104 /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
1105 ///
1106 /// # Example
1107 /// ```rust
1108 /// # #[cfg(feature = "deploy")] {
1109 /// # use hydro_lang::prelude::*;
1110 /// # use futures::StreamExt;
1111 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1112 /// process
1113 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
1114 /// .into_keyed()
1115 /// .flatten_ordered()
1116 /// # .entries()
1117 /// # }, |mut stream| async move {
1118 /// // { 1: [2, 3, 4], 2: [5, 6] }
1119 /// # let mut results = Vec::new();
1120 /// # for _ in 0..5 {
1121 /// # results.push(stream.next().await.unwrap());
1122 /// # }
1123 /// # results.sort();
1124 /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1125 /// # }));
1126 /// # }
1127 /// ```
1128 pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
1129 where
1130 V: IntoIterator<Item = U>,
1131 K: Clone,
1132 {
1133 self.flat_map_ordered(q!(|d| d))
1134 }
1135
1136 /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
1137 /// for the value type `V` to produce items in any order.
1138 ///
1139 /// # Example
1140 /// ```rust
1141 /// # #[cfg(feature = "deploy")] {
1142 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1143 /// # use futures::StreamExt;
1144 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1145 /// process
1146 /// .source_iter(q!(vec![
1147 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1148 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
1149 /// ]))
1150 /// .into_keyed()
1151 /// .flatten_unordered()
1152 /// # .entries()
1153 /// # }, |mut stream| async move {
1154 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1155 /// # let mut results = Vec::new();
1156 /// # for _ in 0..4 {
1157 /// # results.push(stream.next().await.unwrap());
1158 /// # }
1159 /// # results.sort();
1160 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1161 /// # }));
1162 /// # }
1163 /// ```
1164 pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
1165 where
1166 V: IntoIterator<Item = U>,
1167 K: Clone,
1168 {
1169 self.flat_map_unordered(q!(|d| d))
1170 }
1171
1172 /// An operator which allows you to "inspect" each element of a stream without
1173 /// modifying it. The closure `f` is called on a reference to each value. This is
1174 /// mainly useful for debugging, and should not be used to generate side-effects.
1175 ///
1176 /// # Example
1177 /// ```rust
1178 /// # #[cfg(feature = "deploy")] {
1179 /// # use hydro_lang::prelude::*;
1180 /// # use futures::StreamExt;
1181 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1182 /// process
1183 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1184 /// .into_keyed()
1185 /// .inspect(q!(|v| println!("{}", v)))
1186 /// # .entries()
1187 /// # }, |mut stream| async move {
1188 /// # let mut results = Vec::new();
1189 /// # for _ in 0..3 {
1190 /// # results.push(stream.next().await.unwrap());
1191 /// # }
1192 /// # results.sort();
1193 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1194 /// # }));
1195 /// # }
1196 /// ```
1197 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1198 where
1199 F: Fn(&V) + 'a,
1200 {
1201 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1202 let inspect_f = q!({
1203 let orig = f;
1204 move |t: &(_, _)| orig(&t.1)
1205 })
1206 .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1207 .into();
1208
1209 KeyedStream::new(
1210 self.location.clone(),
1211 HydroNode::Inspect {
1212 f: inspect_f,
1213 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1214 metadata: self.location.new_node_metadata(Self::collection_kind()),
1215 },
1216 )
1217 }
1218
1219 /// An operator which allows you to "inspect" each element of a stream without
1220 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1221 /// mainly useful for debugging, and should not be used to generate side-effects.
1222 ///
1223 /// # Example
1224 /// ```rust
1225 /// # #[cfg(feature = "deploy")] {
1226 /// # use hydro_lang::prelude::*;
1227 /// # use futures::StreamExt;
1228 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1229 /// process
1230 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1231 /// .into_keyed()
1232 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1233 /// # .entries()
1234 /// # }, |mut stream| async move {
1235 /// # let mut results = Vec::new();
1236 /// # for _ in 0..3 {
1237 /// # results.push(stream.next().await.unwrap());
1238 /// # }
1239 /// # results.sort();
1240 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1241 /// # }));
1242 /// # }
1243 /// ```
1244 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1245 where
1246 F: Fn(&(K, V)) + 'a,
1247 {
1248 let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1249
1250 KeyedStream::new(
1251 self.location.clone(),
1252 HydroNode::Inspect {
1253 f: inspect_f,
1254 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1255 metadata: self.location.new_node_metadata(Self::collection_kind()),
1256 },
1257 )
1258 }
1259
1260 /// An operator which allows you to "name" a `HydroNode`.
1261 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1262 pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
1263 {
1264 let mut node = self.ir_node.borrow_mut();
1265 let metadata = node.metadata_mut();
1266 metadata.tag = Some(name.to_owned());
1267 }
1268 self
1269 }
1270
1271 /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
1272 ///
1273 /// Unlike [`KeyedStream::fold`] which only returns the final accumulated value, `scan` produces a new stream
1274 /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
1275 /// early by returning `None`.
1276 ///
1277 /// The function takes a mutable reference to the accumulator and the current element, and returns
1278 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1279 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1280 ///
1281 /// # Example
1282 /// ```rust
1283 /// # #[cfg(feature = "deploy")] {
1284 /// # use hydro_lang::prelude::*;
1285 /// # use futures::StreamExt;
1286 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1287 /// process
1288 /// .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
1289 /// .into_keyed()
1290 /// .scan(
1291 /// q!(|| 0),
1292 /// q!(|acc, x| {
1293 /// *acc += x;
1294 /// if *acc % 2 == 0 { None } else { Some(*acc) }
1295 /// }),
1296 /// )
1297 /// # .entries()
1298 /// # }, |mut stream| async move {
1299 /// // Output: { 0: [1], 1: [3, 7] }
1300 /// # let mut results = Vec::new();
1301 /// # for _ in 0..3 {
1302 /// # results.push(stream.next().await.unwrap());
1303 /// # }
1304 /// # results.sort();
1305 /// # assert_eq!(results, vec![(0, 1), (1, 3), (1, 7)]);
1306 /// # }));
1307 /// # }
1308 /// ```
1309 pub fn scan<A, U, I, F>(
1310 self,
1311 init: impl IntoQuotedMut<'a, I, L> + Copy,
1312 f: impl IntoQuotedMut<'a, F, L> + Copy,
1313 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1314 where
1315 O: IsOrdered,
1316 R: IsExactlyOnce,
1317 K: Clone + Eq + Hash,
1318 I: Fn() -> A + 'a,
1319 F: Fn(&mut A, V) -> Option<U> + 'a,
1320 {
1321 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1322 self.make_totally_ordered().make_exactly_once().generator(
1323 init,
1324 q!({
1325 let orig = f;
1326 move |state, v| {
1327 if let Some(out) = orig(state, v) {
1328 Generate::Yield(out)
1329 } else {
1330 Generate::Break
1331 }
1332 }
1333 }),
1334 )
1335 }
1336
1337 /// Iteratively processes the elements in each group using a state machine that can yield
1338 /// elements as it processes its inputs. This is designed to mirror the unstable generator
1339 /// syntax in Rust, without requiring special syntax.
1340 ///
1341 /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
1342 /// state for each group. The second argument defines the processing logic, taking in a
1343 /// mutable reference to the group's state and the value to be processed. It emits a
1344 /// [`Generate`] value, whose variants define what is emitted and whether further inputs
1345 /// should be processed.
1346 ///
1347 /// # Example
1348 /// ```rust
1349 /// # #[cfg(feature = "deploy")] {
1350 /// # use hydro_lang::prelude::*;
1351 /// # use futures::StreamExt;
1352 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1353 /// process
1354 /// .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
1355 /// .into_keyed()
1356 /// .generator(
1357 /// q!(|| 0),
1358 /// q!(|acc, x| {
1359 /// *acc += x;
1360 /// if *acc > 100 {
1361 /// hydro_lang::live_collections::keyed_stream::Generate::Return(
1362 /// "done!".to_owned()
1363 /// )
1364 /// } else if *acc % 2 == 0 {
1365 /// hydro_lang::live_collections::keyed_stream::Generate::Yield(
1366 /// "even".to_owned()
1367 /// )
1368 /// } else {
1369 /// hydro_lang::live_collections::keyed_stream::Generate::Continue
1370 /// }
1371 /// }),
1372 /// )
1373 /// # .entries()
1374 /// # }, |mut stream| async move {
1375 /// // Output: { 0: ["even", "done!"], 1: ["even"] }
1376 /// # let mut results = Vec::new();
1377 /// # for _ in 0..3 {
1378 /// # results.push(stream.next().await.unwrap());
1379 /// # }
1380 /// # results.sort();
1381 /// # assert_eq!(results, vec![(0, "done!".to_owned()), (0, "even".to_owned()), (1, "even".to_owned())]);
1382 /// # }));
1383 /// # }
1384 /// ```
1385 pub fn generator<A, U, I, F>(
1386 self,
1387 init: impl IntoQuotedMut<'a, I, L> + Copy,
1388 f: impl IntoQuotedMut<'a, F, L> + Copy,
1389 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1390 where
1391 O: IsOrdered,
1392 R: IsExactlyOnce,
1393 K: Clone + Eq + Hash,
1394 I: Fn() -> A + 'a,
1395 F: Fn(&mut A, V) -> Generate<U> + 'a,
1396 {
1397 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1398 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1399
1400 let this = self.make_totally_ordered().make_exactly_once();
1401
1402 let scan_init = q!(|| HashMap::new())
1403 .splice_fn0_ctx::<HashMap<K, Option<A>>>(&this.location)
1404 .into();
1405 let scan_f = q!(move |acc: &mut HashMap<_, _>, (k, v)| {
1406 let existing_state = acc.entry(Clone::clone(&k)).or_insert_with(|| Some(init()));
1407 if let Some(existing_state_value) = existing_state {
1408 match f(existing_state_value, v) {
1409 Generate::Yield(out) => Some(Some((k, out))),
1410 Generate::Return(out) => {
1411 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1412 Some(Some((k, out)))
1413 }
1414 Generate::Break => {
1415 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1416 Some(None)
1417 }
1418 Generate::Continue => Some(None),
1419 }
1420 } else {
1421 Some(None)
1422 }
1423 })
1424 .splice_fn2_borrow_mut_ctx::<HashMap<K, Option<A>>, (K, V), _>(&this.location)
1425 .into();
1426
1427 let scan_node = HydroNode::Scan {
1428 init: scan_init,
1429 acc: scan_f,
1430 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1431 metadata: this.location.new_node_metadata(Stream::<
1432 Option<(K, U)>,
1433 L,
1434 B,
1435 TotalOrder,
1436 ExactlyOnce,
1437 >::collection_kind()),
1438 };
1439
1440 let flatten_f = q!(|d| d)
1441 .splice_fn1_ctx::<Option<(K, U)>, _>(&this.location)
1442 .into();
1443 let flatten_node = HydroNode::FlatMap {
1444 f: flatten_f,
1445 input: Box::new(scan_node),
1446 metadata: this.location.new_node_metadata(KeyedStream::<
1447 K,
1448 U,
1449 L,
1450 B,
1451 TotalOrder,
1452 ExactlyOnce,
1453 >::collection_kind()),
1454 };
1455
1456 KeyedStream::new(this.location.clone(), flatten_node)
1457 }
1458
1459 /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
1460 /// in-order across the values in each group. But the aggregation function returns a boolean,
1461 /// which when true indicates that the aggregated result is complete and can be released to
1462 /// downstream computation. Unlike [`KeyedStream::fold`], this means that even if the input
1463 /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
1464 /// normal stream elements.
1465 ///
1466 /// # Example
1467 /// ```rust
1468 /// # #[cfg(feature = "deploy")] {
1469 /// # use hydro_lang::prelude::*;
1470 /// # use futures::StreamExt;
1471 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1472 /// process
1473 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1474 /// .into_keyed()
1475 /// .fold_early_stop(
1476 /// q!(|| 0),
1477 /// q!(|acc, x| {
1478 /// *acc += x;
1479 /// x % 2 == 0
1480 /// }),
1481 /// )
1482 /// # .entries()
1483 /// # }, |mut stream| async move {
1484 /// // Output: { 0: 2, 1: 9 }
1485 /// # let mut results = Vec::new();
1486 /// # for _ in 0..2 {
1487 /// # results.push(stream.next().await.unwrap());
1488 /// # }
1489 /// # results.sort();
1490 /// # assert_eq!(results, vec![(0, 2), (1, 9)]);
1491 /// # }));
1492 /// # }
1493 /// ```
1494 pub fn fold_early_stop<A, I, F>(
1495 self,
1496 init: impl IntoQuotedMut<'a, I, L> + Copy,
1497 f: impl IntoQuotedMut<'a, F, L> + Copy,
1498 ) -> KeyedSingleton<K, A, L, B::WithBoundedValue>
1499 where
1500 O: IsOrdered,
1501 R: IsExactlyOnce,
1502 K: Clone + Eq + Hash,
1503 I: Fn() -> A + 'a,
1504 F: Fn(&mut A, V) -> bool + 'a,
1505 {
1506 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1507 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1508 let out_without_bound_cast = self.generator(
1509 q!(move || Some(init())),
1510 q!(move |key_state, v| {
1511 if let Some(key_state_value) = key_state.as_mut() {
1512 if f(key_state_value, v) {
1513 Generate::Return(key_state.take().unwrap())
1514 } else {
1515 Generate::Continue
1516 }
1517 } else {
1518 unreachable!()
1519 }
1520 }),
1521 );
1522
1523 KeyedSingleton::new(
1524 out_without_bound_cast.location.clone(),
1525 HydroNode::Cast {
1526 inner: Box::new(
1527 out_without_bound_cast
1528 .ir_node
1529 .replace(HydroNode::Placeholder),
1530 ),
1531 metadata: out_without_bound_cast
1532 .location
1533 .new_node_metadata(
1534 KeyedSingleton::<K, A, L, B::WithBoundedValue>::collection_kind(),
1535 ),
1536 },
1537 )
1538 }
1539
1540 /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
1541 /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
1542 /// otherwise the first element would be non-deterministic.
1543 ///
1544 /// # Example
1545 /// ```rust
1546 /// # #[cfg(feature = "deploy")] {
1547 /// # use hydro_lang::prelude::*;
1548 /// # use futures::StreamExt;
1549 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1550 /// process
1551 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1552 /// .into_keyed()
1553 /// .first()
1554 /// # .entries()
1555 /// # }, |mut stream| async move {
1556 /// // Output: { 0: 2, 1: 3 }
1557 /// # let mut results = Vec::new();
1558 /// # for _ in 0..2 {
1559 /// # results.push(stream.next().await.unwrap());
1560 /// # }
1561 /// # results.sort();
1562 /// # assert_eq!(results, vec![(0, 2), (1, 3)]);
1563 /// # }));
1564 /// # }
1565 /// ```
1566 pub fn first(self) -> KeyedSingleton<K, V, L, B::WithBoundedValue>
1567 where
1568 O: IsOrdered,
1569 R: IsExactlyOnce,
1570 K: Clone + Eq + Hash,
1571 {
1572 self.fold_early_stop(
1573 q!(|| None),
1574 q!(|acc, v| {
1575 *acc = Some(v);
1576 true
1577 }),
1578 )
1579 .map(q!(|v| v.unwrap()))
1580 }
1581
1582 /// Returns a keyed stream containing at most the first `n` values per key,
1583 /// preserving the original order within each group. Similar to SQL `LIMIT`
1584 /// applied per group.
1585 ///
1586 /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1587 /// retries, since the result depends on the order and cardinality of elements
1588 /// within each group.
1589 ///
1590 /// # Example
1591 /// ```rust
1592 /// # #[cfg(feature = "deploy")] {
1593 /// # use hydro_lang::prelude::*;
1594 /// # use futures::StreamExt;
1595 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1596 /// process
1597 /// .source_iter(q!(vec![(1, 10), (1, 20), (1, 30), (2, 40), (2, 50)]))
1598 /// .into_keyed()
1599 /// .limit(q!(2))
1600 /// # .entries()
1601 /// # }, |mut stream| async move {
1602 /// // { 1: [10, 20], 2: [40, 50] }
1603 /// # let mut results = Vec::new();
1604 /// # for _ in 0..4 {
1605 /// # results.push(stream.next().await.unwrap());
1606 /// # }
1607 /// # results.sort();
1608 /// # assert_eq!(results, vec![(1, 10), (1, 20), (2, 40), (2, 50)]);
1609 /// # }));
1610 /// # }
1611 /// ```
1612 pub fn limit(
1613 self,
1614 n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1615 ) -> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
1616 where
1617 O: IsOrdered,
1618 R: IsExactlyOnce,
1619 K: Clone + Eq + Hash,
1620 {
1621 self.generator(
1622 q!(|| 0usize),
1623 q!(move |count, item| {
1624 if *count == n {
1625 Generate::Break
1626 } else {
1627 *count += 1;
1628 if *count == n {
1629 Generate::Return(item)
1630 } else {
1631 Generate::Yield(item)
1632 }
1633 }
1634 }),
1635 )
1636 }
1637
1638 /// Assigns a zero-based index to each value within each key group, emitting
1639 /// `(K, (index, V))` tuples with per-key sequential indices.
1640 ///
1641 /// The output keyed stream has [`TotalOrder`] and [`ExactlyOnce`] guarantees.
1642 /// This is a streaming operator that processes elements as they arrive.
1643 ///
1644 /// # Example
1645 /// ```rust
1646 /// # #[cfg(feature = "deploy")] {
1647 /// # use hydro_lang::prelude::*;
1648 /// # use futures::StreamExt;
1649 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1650 /// process
1651 /// .source_iter(q!(vec![(1, 10), (2, 20), (1, 30)]))
1652 /// .into_keyed()
1653 /// .enumerate()
1654 /// # .entries()
1655 /// # }, |mut stream| async move {
1656 /// // per-key indices: { 1: [(0, 10), (1, 30)], 2: [(0, 20)] }
1657 /// # let mut results = Vec::new();
1658 /// # for _ in 0..3 {
1659 /// # results.push(stream.next().await.unwrap());
1660 /// # }
1661 /// # let key1: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
1662 /// # let key2: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
1663 /// # assert_eq!(key1, vec![(0, 10), (1, 30)]);
1664 /// # assert_eq!(key2, vec![(0, 20)]);
1665 /// # }));
1666 /// # }
1667 /// ```
1668 pub fn enumerate(self) -> KeyedStream<K, (usize, V), L, B, TotalOrder, ExactlyOnce>
1669 where
1670 O: IsOrdered,
1671 R: IsExactlyOnce,
1672 K: Eq + Hash + Clone,
1673 {
1674 self.scan(
1675 q!(|| 0),
1676 q!(|acc, next| {
1677 let curr = *acc;
1678 *acc += 1;
1679 Some((curr, next))
1680 }),
1681 )
1682 }
1683
1684 /// Counts the number of elements in each group, producing a [`KeyedSingleton`] with the counts.
1685 ///
1686 /// # Example
1687 /// ```rust
1688 /// # #[cfg(feature = "deploy")] {
1689 /// # use hydro_lang::prelude::*;
1690 /// # use futures::StreamExt;
1691 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1692 /// let tick = process.tick();
1693 /// let numbers = process
1694 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4), (1, 5)]))
1695 /// .into_keyed();
1696 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1697 /// batch
1698 /// .value_counts()
1699 /// .entries()
1700 /// .all_ticks()
1701 /// # }, |mut stream| async move {
1702 /// // (1, 3), (2, 2)
1703 /// # let mut results = Vec::new();
1704 /// # for _ in 0..2 {
1705 /// # results.push(stream.next().await.unwrap());
1706 /// # }
1707 /// # results.sort();
1708 /// # assert_eq!(results, vec![(1, 3), (2, 2)]);
1709 /// # }));
1710 /// # }
1711 /// ```
1712 pub fn value_counts(
1713 self,
1714 ) -> KeyedSingleton<K, usize, L, <B as KeyedSingletonBound>::KeyedStreamToMonotone>
1715 where
1716 R: IsExactlyOnce,
1717 K: Eq + Hash,
1718 {
1719 self.make_exactly_once()
1720 .assume_ordering_trusted(
1721 nondet!(/** ordering within each group affects neither result nor intermediates */),
1722 )
1723 .fold(
1724 q!(|| 0),
1725 q!(
1726 |acc, _| *acc += 1,
1727 monotone = manual_proof!(/** += 1 is monotonic */)
1728 ),
1729 )
1730 }
1731
1732 /// Like [`Stream::fold`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1733 /// group via the `comb` closure.
1734 ///
1735 /// Depending on the input stream guarantees, the closure may need to be commutative
1736 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1737 ///
1738 /// If the input and output value types are the same and do not require initialization then use
1739 /// [`KeyedStream::reduce`].
1740 ///
1741 /// # Example
1742 /// ```rust
1743 /// # #[cfg(feature = "deploy")] {
1744 /// # use hydro_lang::prelude::*;
1745 /// # use futures::StreamExt;
1746 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1747 /// let tick = process.tick();
1748 /// let numbers = process
1749 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1750 /// .into_keyed();
1751 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1752 /// batch
1753 /// .fold(q!(|| false), q!(|acc, x| *acc |= x))
1754 /// .entries()
1755 /// .all_ticks()
1756 /// # }, |mut stream| async move {
1757 /// // (1, false), (2, true)
1758 /// # let mut results = Vec::new();
1759 /// # for _ in 0..2 {
1760 /// # results.push(stream.next().await.unwrap());
1761 /// # }
1762 /// # results.sort();
1763 /// # assert_eq!(results, vec![(1, false), (2, true)]);
1764 /// # }));
1765 /// # }
1766 /// ```
1767 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V), C, Idemp, M, B2: KeyedSingletonBound>(
1768 self,
1769 init: impl IntoQuotedMut<'a, I, L>,
1770 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1771 ) -> KeyedSingleton<K, A, L, B2>
1772 where
1773 K: Eq + Hash,
1774 C: ValidCommutativityFor<O>,
1775 Idemp: ValidIdempotenceFor<R>,
1776 B: ApplyMonotoneKeyedStream<M, B2>,
1777 {
1778 let init = init.splice_fn0_ctx(&self.location).into();
1779 let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1780 proof.register_proof(&comb);
1781
1782 let ordered = self
1783 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1784 .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1785
1786 KeyedSingleton::new(
1787 ordered.location.clone(),
1788 HydroNode::FoldKeyed {
1789 init,
1790 acc: comb.into(),
1791 input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
1792 metadata: ordered
1793 .location
1794 .new_node_metadata(KeyedSingleton::<K, A, L, B2>::collection_kind()),
1795 },
1796 )
1797 }
1798
1799 /// Like [`Stream::reduce`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1800 /// group via the `comb` closure.
1801 ///
1802 /// Depending on the input stream guarantees, the closure may need to be commutative
1803 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1804 ///
1805 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1806 ///
1807 /// # Example
1808 /// ```rust
1809 /// # #[cfg(feature = "deploy")] {
1810 /// # use hydro_lang::prelude::*;
1811 /// # use futures::StreamExt;
1812 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1813 /// let tick = process.tick();
1814 /// let numbers = process
1815 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1816 /// .into_keyed();
1817 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1818 /// batch
1819 /// .reduce(q!(|acc, x| *acc |= x))
1820 /// .entries()
1821 /// .all_ticks()
1822 /// # }, |mut stream| async move {
1823 /// // (1, false), (2, true)
1824 /// # let mut results = Vec::new();
1825 /// # for _ in 0..2 {
1826 /// # results.push(stream.next().await.unwrap());
1827 /// # }
1828 /// # results.sort();
1829 /// # assert_eq!(results, vec![(1, false), (2, true)]);
1830 /// # }));
1831 /// # }
1832 /// ```
1833 pub fn reduce<F: Fn(&mut V, V) + 'a, C, Idemp>(
1834 self,
1835 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1836 ) -> KeyedSingleton<K, V, L, B>
1837 where
1838 K: Eq + Hash,
1839 C: ValidCommutativityFor<O>,
1840 Idemp: ValidIdempotenceFor<R>,
1841 {
1842 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1843 proof.register_proof(&f);
1844
1845 let ordered = self
1846 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1847 .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1848
1849 KeyedSingleton::new(
1850 ordered.location.clone(),
1851 HydroNode::ReduceKeyed {
1852 f: f.into(),
1853 input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
1854 metadata: ordered
1855 .location
1856 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1857 },
1858 )
1859 }
1860
1861 /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark
1862 /// are automatically deleted.
1863 ///
1864 /// Depending on the input stream guarantees, the closure may need to be commutative
1865 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1866 ///
1867 /// # Example
1868 /// ```rust
1869 /// # #[cfg(feature = "deploy")] {
1870 /// # use hydro_lang::prelude::*;
1871 /// # use futures::StreamExt;
1872 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1873 /// let tick = process.tick();
1874 /// let watermark = tick.singleton(q!(2));
1875 /// let numbers = process
1876 /// .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1877 /// .into_keyed();
1878 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1879 /// batch
1880 /// .reduce_watermark(watermark, q!(|acc, x| *acc |= x))
1881 /// .entries()
1882 /// .all_ticks()
1883 /// # }, |mut stream| async move {
1884 /// // (2, true)
1885 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1886 /// # }));
1887 /// # }
1888 /// ```
1889 pub fn reduce_watermark<O2, F, C, Idemp>(
1890 self,
1891 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1892 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1893 ) -> KeyedSingleton<K, V, L, B>
1894 where
1895 K: Eq + Hash,
1896 O2: Clone,
1897 F: Fn(&mut V, V) + 'a,
1898 C: ValidCommutativityFor<O>,
1899 Idemp: ValidIdempotenceFor<R>,
1900 {
1901 let other: Optional<O2, Tick<L::Root>, Bounded> = other.into();
1902 check_matching_location(&self.location.root(), other.location.outer());
1903 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1904 proof.register_proof(&f);
1905
1906 let ordered = self
1907 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1908 .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1909
1910 KeyedSingleton::new(
1911 ordered.location.clone(),
1912 HydroNode::ReduceKeyedWatermark {
1913 f: f.into(),
1914 input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
1915 watermark: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1916 metadata: ordered
1917 .location
1918 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1919 },
1920 )
1921 }
1922
1923 /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1924 /// whose keys are not in the bounded stream.
1925 ///
1926 /// # Example
1927 /// ```rust
1928 /// # #[cfg(feature = "deploy")] {
1929 /// # use hydro_lang::prelude::*;
1930 /// # use futures::StreamExt;
1931 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1932 /// let tick = process.tick();
1933 /// let keyed_stream = process
1934 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1935 /// .batch(&tick, nondet!(/** test */))
1936 /// .into_keyed();
1937 /// let keys_to_remove = process
1938 /// .source_iter(q!(vec![1, 2]))
1939 /// .batch(&tick, nondet!(/** test */));
1940 /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1941 /// # .entries()
1942 /// # }, |mut stream| async move {
1943 /// // { 3: ['c'], 4: ['d'] }
1944 /// # let mut results = Vec::new();
1945 /// # for _ in 0..2 {
1946 /// # results.push(stream.next().await.unwrap());
1947 /// # }
1948 /// # results.sort();
1949 /// # assert_eq!(results, vec![(3, 'c'), (4, 'd')]);
1950 /// # }));
1951 /// # }
1952 /// ```
1953 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1954 self,
1955 other: Stream<K, L, Bounded, O2, R2>,
1956 ) -> Self
1957 where
1958 K: Eq + Hash,
1959 {
1960 check_matching_location(&self.location, &other.location);
1961
1962 KeyedStream::new(
1963 self.location.clone(),
1964 HydroNode::AntiJoin {
1965 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1966 neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1967 metadata: self.location.new_node_metadata(Self::collection_kind()),
1968 },
1969 )
1970 }
1971
1972 /// Emit a keyed stream containing keys shared between two keyed streams,
1973 /// where each value in the output keyed stream is a tuple of
1974 /// (self's value, other's value).
1975 /// If there are multiple values for the same key, this performs a cross product
1976 /// for each matching key.
1977 ///
1978 /// # Example
1979 /// ```rust
1980 /// # #[cfg(feature = "deploy")] {
1981 /// # use hydro_lang::prelude::*;
1982 /// # use futures::StreamExt;
1983 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1984 /// let tick = process.tick();
1985 /// let keyed_data = process
1986 /// .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
1987 /// .into_keyed()
1988 /// .batch(&tick, nondet!(/** test */));
1989 /// let other_data = process
1990 /// .source_iter(q!(vec![(1, 100), (2, 200), (2, 201)]))
1991 /// .into_keyed()
1992 /// .batch(&tick, nondet!(/** test */));
1993 /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
1994 /// # }, |mut stream| async move {
1995 /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200), (20, 201)] } in any order
1996 /// # let mut results = vec![];
1997 /// # for _ in 0..4 {
1998 /// # results.push(stream.next().await.unwrap());
1999 /// # }
2000 /// # results.sort();
2001 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200)), (2, (20, 201))]);
2002 /// # }));
2003 /// # }
2004 /// ```
2005 pub fn join_keyed_stream<V2, O2: Ordering, R2: Retries>(
2006 self,
2007 other: KeyedStream<K, V2, L, B, O2, R2>,
2008 ) -> KeyedStream<K, (V, V2), L, B, NoOrder, <R as MinRetries<R2>>::Min>
2009 where
2010 K: Eq + Hash,
2011 R: MinRetries<R2>,
2012 {
2013 self.entries().join(other.entries()).into_keyed()
2014 }
2015
2016 /// Deduplicates values within each key group, emitting each unique value per key
2017 /// exactly once.
2018 ///
2019 /// # Example
2020 /// ```rust
2021 /// # #[cfg(feature = "deploy")] {
2022 /// # use hydro_lang::prelude::*;
2023 /// # use futures::StreamExt;
2024 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2025 /// process
2026 /// .source_iter(q!(vec![(1, 10), (2, 20), (1, 10), (2, 30), (1, 20)]))
2027 /// .into_keyed()
2028 /// .unique()
2029 /// # .entries()
2030 /// # }, |mut stream| async move {
2031 /// // unique values per key: { 1: [10, 20], 2: [20, 30] }
2032 /// # let mut results = Vec::new();
2033 /// # for _ in 0..4 {
2034 /// # results.push(stream.next().await.unwrap());
2035 /// # }
2036 /// # let mut key1: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
2037 /// # let mut key2: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
2038 /// # key1.sort();
2039 /// # key2.sort();
2040 /// # assert_eq!(key1, vec![10, 20]);
2041 /// # assert_eq!(key2, vec![20, 30]);
2042 /// # }));
2043 /// # }
2044 /// ```
2045 pub fn unique(self) -> KeyedStream<K, V, L, B, NoOrder, ExactlyOnce>
2046 where
2047 K: Eq + Hash + Clone,
2048 V: Eq + Hash + Clone,
2049 {
2050 self.entries().unique().into_keyed()
2051 }
2052
2053 /// Sorts the values within each key group in ascending order.
2054 ///
2055 /// The output keyed stream has a [`TotalOrder`] guarantee on the values within
2056 /// each group. This operator will block until all elements in the input stream
2057 /// are available, so it requires the input stream to be [`Bounded`].
2058 ///
2059 /// # Example
2060 /// ```rust
2061 /// # #[cfg(feature = "deploy")] {
2062 /// # use hydro_lang::prelude::*;
2063 /// # use futures::StreamExt;
2064 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2065 /// let tick = process.tick();
2066 /// let numbers = process
2067 /// .source_iter(q!(vec![(1, 3), (2, 1), (1, 1), (2, 2)]))
2068 /// .into_keyed();
2069 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2070 /// batch.sort().all_ticks()
2071 /// # .entries()
2072 /// # }, |mut stream| async move {
2073 /// // values sorted within each key: { 1: [1, 3], 2: [1, 2] }
2074 /// # let mut results = Vec::new();
2075 /// # for _ in 0..4 {
2076 /// # results.push(stream.next().await.unwrap());
2077 /// # }
2078 /// # let key1_vals: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
2079 /// # let key2_vals: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
2080 /// # assert_eq!(key1_vals, vec![1, 3]);
2081 /// # assert_eq!(key2_vals, vec![1, 2]);
2082 /// # }));
2083 /// # }
2084 /// ```
2085 pub fn sort(self) -> KeyedStream<K, V, L, Bounded, TotalOrder, R>
2086 where
2087 B: IsBounded,
2088 K: Ord,
2089 V: Ord,
2090 {
2091 self.entries().sort().into_keyed()
2092 }
2093
2094 /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
2095 /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
2096 /// is only present in one of the inputs, its values are passed through as-is). The output has
2097 /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
2098 ///
2099 /// Currently, both input streams must be [`Bounded`]. This operator will block
2100 /// on the first stream until all its elements are available. In a future version,
2101 /// we will relax the requirement on the `other` stream.
2102 ///
2103 /// # Example
2104 /// ```rust
2105 /// # #[cfg(feature = "deploy")] {
2106 /// # use hydro_lang::prelude::*;
2107 /// # use futures::StreamExt;
2108 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2109 /// let tick = process.tick();
2110 /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
2111 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2112 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2113 /// # .entries()
2114 /// # }, |mut stream| async move {
2115 /// // { 0: [2, 1], 1: [4, 3] }
2116 /// # let mut results = Vec::new();
2117 /// # for _ in 0..4 {
2118 /// # results.push(stream.next().await.unwrap());
2119 /// # }
2120 /// # results.sort();
2121 /// # assert_eq!(results, vec![(0, 1), (0, 2), (1, 3), (1, 4)]);
2122 /// # }));
2123 /// # }
2124 /// ```
2125 pub fn chain<O2: Ordering, R2: Retries>(
2126 self,
2127 other: KeyedStream<K, V, L, Bounded, O2, R2>,
2128 ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2129 where
2130 B: IsBounded,
2131 O: MinOrder<O2>,
2132 R: MinRetries<R2>,
2133 {
2134 let this = self.make_bounded();
2135 check_matching_location(&this.location, &other.location);
2136
2137 KeyedStream::new(
2138 this.location.clone(),
2139 HydroNode::Chain {
2140 first: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2141 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2142 metadata: this.location.new_node_metadata(KeyedStream::<
2143 K,
2144 V,
2145 L,
2146 Bounded,
2147 <O as MinOrder<O2>>::Min,
2148 <R as MinRetries<R2>>::Min,
2149 >::collection_kind()),
2150 },
2151 )
2152 }
2153
2154 /// Emit a keyed stream containing keys shared between the keyed stream and the
2155 /// keyed singleton, where each value in the output keyed stream is a tuple of
2156 /// (the keyed stream's value, the keyed singleton's value).
2157 ///
2158 /// # Example
2159 /// ```rust
2160 /// # #[cfg(feature = "deploy")] {
2161 /// # use hydro_lang::prelude::*;
2162 /// # use futures::StreamExt;
2163 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2164 /// let tick = process.tick();
2165 /// let keyed_data = process
2166 /// .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2167 /// .into_keyed()
2168 /// .batch(&tick, nondet!(/** test */));
2169 /// let singleton_data = process
2170 /// .source_iter(q!(vec![(1, 100), (2, 200)]))
2171 /// .into_keyed()
2172 /// .batch(&tick, nondet!(/** test */))
2173 /// .first();
2174 /// keyed_data.join_keyed_singleton(singleton_data).entries().all_ticks()
2175 /// # }, |mut stream| async move {
2176 /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200)] } in any order
2177 /// # let mut results = vec![];
2178 /// # for _ in 0..3 {
2179 /// # results.push(stream.next().await.unwrap());
2180 /// # }
2181 /// # results.sort();
2182 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200))]);
2183 /// # }));
2184 /// # }
2185 /// ```
2186 pub fn join_keyed_singleton<V2: Clone>(
2187 self,
2188 keyed_singleton: KeyedSingleton<K, V2, L, Bounded>,
2189 ) -> KeyedStream<K, (V, V2), L, Bounded, NoOrder, R>
2190 where
2191 B: IsBounded,
2192 K: Eq + Hash,
2193 {
2194 keyed_singleton
2195 .join_keyed_stream(self.make_bounded())
2196 .map(q!(|(v2, v)| (v, v2)))
2197 }
2198
2199 /// Gets the values associated with a specific key from the keyed stream.
2200 /// Returns an empty stream if the key is `None` or there are no associated values.
2201 ///
2202 /// # Example
2203 /// ```rust
2204 /// # #[cfg(feature = "deploy")] {
2205 /// # use hydro_lang::prelude::*;
2206 /// # use futures::StreamExt;
2207 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2208 /// let tick = process.tick();
2209 /// let keyed_data = process
2210 /// .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2211 /// .into_keyed()
2212 /// .batch(&tick, nondet!(/** test */));
2213 /// let key = tick.singleton(q!(1));
2214 /// keyed_data.get(key).all_ticks()
2215 /// # }, |mut stream| async move {
2216 /// // 10, 11 in any order
2217 /// # let mut results = vec![];
2218 /// # for _ in 0..2 {
2219 /// # results.push(stream.next().await.unwrap());
2220 /// # }
2221 /// # results.sort();
2222 /// # assert_eq!(results, vec![10, 11]);
2223 /// # }));
2224 /// # }
2225 /// ```
2226 pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Stream<V, L, Bounded, NoOrder, R>
2227 where
2228 B: IsBounded,
2229 K: Eq + Hash,
2230 {
2231 self.make_bounded()
2232 .entries()
2233 .join(key.into().into_stream().map(q!(|k| (k, ()))))
2234 .map(q!(|(_, (v, _))| v))
2235 }
2236
2237 /// For each value in `self`, find the matching key in `lookup`.
2238 /// The output is a keyed stream with the key from `self`, and a value
2239 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2240 /// If the key is not present in `lookup`, the option will be [`None`].
2241 ///
2242 /// # Example
2243 /// ```rust
2244 /// # #[cfg(feature = "deploy")] {
2245 /// # use hydro_lang::prelude::*;
2246 /// # use futures::StreamExt;
2247 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2248 /// # let tick = process.tick();
2249 /// let requests = // { 1: [10, 11], 2: 20 }
2250 /// # process
2251 /// # .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2252 /// # .into_keyed()
2253 /// # .batch(&tick, nondet!(/** test */));
2254 /// let other_data = // { 10: 100, 11: 110 }
2255 /// # process
2256 /// # .source_iter(q!(vec![(10, 100), (11, 110)]))
2257 /// # .into_keyed()
2258 /// # .batch(&tick, nondet!(/** test */))
2259 /// # .first();
2260 /// requests.lookup_keyed_singleton(other_data)
2261 /// # .entries().all_ticks()
2262 /// # }, |mut stream| async move {
2263 /// // { 1: [(10, Some(100)), (11, Some(110))], 2: (20, None) }
2264 /// # let mut results = vec![];
2265 /// # for _ in 0..3 {
2266 /// # results.push(stream.next().await.unwrap());
2267 /// # }
2268 /// # results.sort();
2269 /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (11, Some(110))), (2, (20, None))]);
2270 /// # }));
2271 /// # }
2272 /// ```
2273 pub fn lookup_keyed_singleton<V2>(
2274 self,
2275 lookup: KeyedSingleton<V, V2, L, Bounded>,
2276 ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
2277 where
2278 B: IsBounded,
2279 K: Eq + Hash + Clone,
2280 V: Eq + Hash + Clone,
2281 V2: Clone,
2282 {
2283 self.lookup_keyed_stream(
2284 lookup
2285 .into_keyed_stream()
2286 .assume_retries::<R>(nondet!(/** Retries are irrelevant for keyed singletons */)),
2287 )
2288 }
2289
2290 /// For each value in `self`, find the matching key in `lookup`.
2291 /// The output is a keyed stream with the key from `self`, and a value
2292 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2293 /// If the key is not present in `lookup`, the option will be [`None`].
2294 ///
2295 /// # Example
2296 /// ```rust
2297 /// # #[cfg(feature = "deploy")] {
2298 /// # use hydro_lang::prelude::*;
2299 /// # use futures::StreamExt;
2300 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2301 /// # let tick = process.tick();
2302 /// let requests = // { 1: [10, 11], 2: 20 }
2303 /// # process
2304 /// # .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2305 /// # .into_keyed()
2306 /// # .batch(&tick, nondet!(/** test */));
2307 /// let other_data = // { 10: [100, 101], 11: 110 }
2308 /// # process
2309 /// # .source_iter(q!(vec![(10, 100), (10, 101), (11, 110)]))
2310 /// # .into_keyed()
2311 /// # .batch(&tick, nondet!(/** test */));
2312 /// requests.lookup_keyed_stream(other_data)
2313 /// # .entries().all_ticks()
2314 /// # }, |mut stream| async move {
2315 /// // { 1: [(10, Some(100)), (10, Some(101)), (11, Some(110))], 2: (20, None) }
2316 /// # let mut results = vec![];
2317 /// # for _ in 0..4 {
2318 /// # results.push(stream.next().await.unwrap());
2319 /// # }
2320 /// # results.sort();
2321 /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(101))), (1, (11, Some(110))), (2, (20, None))]);
2322 /// # }));
2323 /// # }
2324 /// ```
2325 #[expect(clippy::type_complexity, reason = "retries propagation")]
2326 pub fn lookup_keyed_stream<V2, O2: Ordering, R2: Retries>(
2327 self,
2328 lookup: KeyedStream<V, V2, L, Bounded, O2, R2>,
2329 ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, <R as MinRetries<R2>>::Min>
2330 where
2331 B: IsBounded,
2332 K: Eq + Hash + Clone,
2333 V: Eq + Hash + Clone,
2334 V2: Clone,
2335 R: MinRetries<R2>,
2336 {
2337 let inverted = self
2338 .make_bounded()
2339 .entries()
2340 .map(q!(|(key, lookup_value)| (lookup_value, key)))
2341 .into_keyed();
2342 let found = inverted
2343 .clone()
2344 .join_keyed_stream(lookup.clone())
2345 .entries()
2346 .map(q!(|(lookup_value, (key, value))| (
2347 key,
2348 (lookup_value, Some(value))
2349 )))
2350 .into_keyed();
2351 let not_found = inverted
2352 .filter_key_not_in(lookup.keys())
2353 .entries()
2354 .map(q!(|(lookup_value, key)| (key, (lookup_value, None))))
2355 .into_keyed();
2356
2357 found.chain(not_found.weaken_retries::<<R as MinRetries<R2>>::Min>())
2358 }
2359
2360 /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
2361 /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
2362 ///
2363 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2364 /// processed before an acknowledgement is emitted.
2365 pub fn atomic(self) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
2366 let id = self.location.flow_state().borrow_mut().next_clock_id();
2367 let out_location = Atomic {
2368 tick: Tick {
2369 id,
2370 l: self.location.clone(),
2371 },
2372 };
2373 KeyedStream::new(
2374 out_location.clone(),
2375 HydroNode::BeginAtomic {
2376 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2377 metadata: out_location
2378 .new_node_metadata(KeyedStream::<K, V, Atomic<L>, B, O, R>::collection_kind()),
2379 },
2380 )
2381 }
2382
2383 /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
2384 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2385 /// the order of the input.
2386 ///
2387 /// # Non-Determinism
2388 /// The batch boundaries are non-deterministic and may change across executions.
2389 pub fn batch(
2390 self,
2391 tick: &Tick<L>,
2392 nondet: NonDet,
2393 ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2394 let _ = nondet;
2395 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2396 KeyedStream::new(
2397 tick.clone(),
2398 HydroNode::Batch {
2399 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2400 metadata: tick.new_node_metadata(
2401 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
2402 ),
2403 },
2404 )
2405 }
2406}
2407
2408impl<'a, K1, K2, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2409 KeyedStream<(K1, K2), V, L, B, O, R>
2410{
2411 /// Produces a new keyed stream by dropping the first element of the compound key.
2412 ///
2413 /// Because multiple keys may share the same suffix, this operation results in re-grouping
2414 /// of the values under the new keys. The values across groups with the same new key
2415 /// will be interleaved, so the resulting stream has [`NoOrder`] within each group.
2416 ///
2417 /// # Example
2418 /// ```rust
2419 /// # #[cfg(feature = "deploy")] {
2420 /// # use hydro_lang::prelude::*;
2421 /// # use futures::StreamExt;
2422 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2423 /// process
2424 /// .source_iter(q!(vec![((1, 10), 2), ((1, 10), 3), ((2, 20), 4)]))
2425 /// .into_keyed()
2426 /// .drop_key_prefix()
2427 /// # .entries()
2428 /// # }, |mut stream| async move {
2429 /// // { 10: [2, 3], 20: [4] }
2430 /// # let mut results = Vec::new();
2431 /// # for _ in 0..3 {
2432 /// # results.push(stream.next().await.unwrap());
2433 /// # }
2434 /// # results.sort();
2435 /// # assert_eq!(results, vec![(10, 2), (10, 3), (20, 4)]);
2436 /// # }));
2437 /// # }
2438 /// ```
2439 pub fn drop_key_prefix(self) -> KeyedStream<K2, V, L, B, NoOrder, R> {
2440 self.entries()
2441 .map(q!(|((_k1, k2), v)| (k2, v)))
2442 .into_keyed()
2443 }
2444}
2445
2446impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries>
2447 KeyedStream<K, V, L, Unbounded, O, R>
2448{
2449 /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
2450 /// of any overlapping groups. The result has [`NoOrder`] on each group because the
2451 /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
2452 /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
2453 ///
2454 /// Currently, both input streams must be [`Unbounded`].
2455 ///
2456 /// # Example
2457 /// ```rust
2458 /// # #[cfg(feature = "deploy")] {
2459 /// # use hydro_lang::prelude::*;
2460 /// # use futures::StreamExt;
2461 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2462 /// let numbers1: KeyedStream<i32, i32, _> = // { 1: [2], 3: [4] }
2463 /// # process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed().into();
2464 /// let numbers2: KeyedStream<i32, i32, _> = // { 1: [3], 3: [5] }
2465 /// # process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed().into();
2466 /// numbers1.merge_unordered(numbers2)
2467 /// # .entries()
2468 /// # }, |mut stream| async move {
2469 /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
2470 /// # let mut results = Vec::new();
2471 /// # for _ in 0..4 {
2472 /// # results.push(stream.next().await.unwrap());
2473 /// # }
2474 /// # results.sort();
2475 /// # assert_eq!(results, vec![(1, 2), (1, 3), (3, 4), (3, 5)]);
2476 /// # }));
2477 /// # }
2478 /// ```
2479 pub fn merge_unordered<O2: Ordering, R2: Retries>(
2480 self,
2481 other: KeyedStream<K, V, L, Unbounded, O2, R2>,
2482 ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2483 where
2484 R: MinRetries<R2>,
2485 {
2486 KeyedStream::new(
2487 self.location.clone(),
2488 HydroNode::Chain {
2489 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2490 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2491 metadata: self.location.new_node_metadata(KeyedStream::<
2492 K,
2493 V,
2494 L,
2495 Unbounded,
2496 NoOrder,
2497 <R as MinRetries<R2>>::Min,
2498 >::collection_kind()),
2499 },
2500 )
2501 }
2502
2503 /// Deprecated: use [`KeyedStream::merge_unordered`] instead.
2504 #[deprecated(note = "use `merge_unordered` instead")]
2505 pub fn interleave<O2: Ordering, R2: Retries>(
2506 self,
2507 other: KeyedStream<K, V, L, Unbounded, O2, R2>,
2508 ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2509 where
2510 R: MinRetries<R2>,
2511 {
2512 self.merge_unordered(other)
2513 }
2514}
2515
2516impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
2517where
2518 L: Location<'a> + NoTick,
2519{
2520 /// Returns a keyed stream corresponding to the latest batch of elements being atomically
2521 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2522 /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
2523 /// used to create the atomic section.
2524 ///
2525 /// # Non-Determinism
2526 /// The batch boundaries are non-deterministic and may change across executions.
2527 pub fn batch_atomic(
2528 self,
2529 tick: &Tick<L>,
2530 nondet: NonDet,
2531 ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2532 let _ = nondet;
2533 KeyedStream::new(
2534 tick.clone(),
2535 HydroNode::Batch {
2536 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2537 metadata: tick.new_node_metadata(
2538 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
2539 ),
2540 },
2541 )
2542 }
2543
2544 /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
2545 /// See [`KeyedStream::atomic`] for more details.
2546 pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
2547 KeyedStream::new(
2548 self.location.tick.l.clone(),
2549 HydroNode::EndAtomic {
2550 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2551 metadata: self
2552 .location
2553 .tick
2554 .l
2555 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2556 },
2557 )
2558 }
2559}
2560
2561impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
2562where
2563 L: Location<'a>,
2564{
2565 /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2566 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2567 /// each key.
2568 pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
2569 KeyedStream::new(
2570 self.location.outer().clone(),
2571 HydroNode::YieldConcat {
2572 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2573 metadata: self.location.outer().new_node_metadata(KeyedStream::<
2574 K,
2575 V,
2576 L,
2577 Unbounded,
2578 O,
2579 R,
2580 >::collection_kind(
2581 )),
2582 },
2583 )
2584 }
2585
2586 /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2587 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2588 /// each key.
2589 ///
2590 /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
2591 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2592 /// stream's [`Tick`] context.
2593 pub fn all_ticks_atomic(self) -> KeyedStream<K, V, Atomic<L>, Unbounded, O, R> {
2594 let out_location = Atomic {
2595 tick: self.location.clone(),
2596 };
2597
2598 KeyedStream::new(
2599 out_location.clone(),
2600 HydroNode::YieldConcat {
2601 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2602 metadata: out_location.new_node_metadata(KeyedStream::<
2603 K,
2604 V,
2605 Atomic<L>,
2606 Unbounded,
2607 O,
2608 R,
2609 >::collection_kind()),
2610 },
2611 )
2612 }
2613
2614 /// Transforms the keyed stream using the given closure in "stateful" mode, where stateful operators
2615 /// such as `fold` retrain their memory for each key across ticks rather than resetting across batches of each key.
2616 ///
2617 /// This API is particularly useful for stateful computation on batches of data, such as
2618 /// maintaining an accumulated state that is up to date with the current batch.
2619 ///
2620 /// # Example
2621 /// ```rust
2622 /// # #[cfg(feature = "deploy")] {
2623 /// # use hydro_lang::prelude::*;
2624 /// # use futures::StreamExt;
2625 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2626 /// let tick = process.tick();
2627 /// # // ticks are lazy by default, forces the second tick to run
2628 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2629 /// # let batch_first_tick = process
2630 /// # .source_iter(q!(vec![(0, 1), (1, 2), (2, 3), (3, 4)]))
2631 /// # .into_keyed()
2632 /// # .batch(&tick, nondet!(/** test */));
2633 /// # let batch_second_tick = process
2634 /// # .source_iter(q!(vec![(0, 5), (1, 6), (2, 7)]))
2635 /// # .into_keyed()
2636 /// # .batch(&tick, nondet!(/** test */))
2637 /// # .defer_tick(); // appears on the second tick
2638 /// let input = batch_first_tick.chain(batch_second_tick).all_ticks();
2639 ///
2640 /// input.batch(&tick, nondet!(/** test */))
2641 /// .across_ticks(|s| s.reduce(q!(|sum, new| {
2642 /// *sum += new;
2643 /// }))).entries().all_ticks()
2644 /// # }, |mut stream| async move {
2645 /// // First tick: [(0, 1), (1, 2), (2, 3), (3, 4)]
2646 /// # let mut results = Vec::new();
2647 /// # for _ in 0..4 {
2648 /// # results.push(stream.next().await.unwrap());
2649 /// # }
2650 /// # results.sort();
2651 /// # assert_eq!(results, vec![(0, 1), (1, 2), (2, 3), (3, 4)]);
2652 /// // Second tick: [(0, 6), (1, 8), (2, 10), (3, 4)]
2653 /// # results.clear();
2654 /// # for _ in 0..4 {
2655 /// # results.push(stream.next().await.unwrap());
2656 /// # }
2657 /// # results.sort();
2658 /// # assert_eq!(results, vec![(0, 6), (1, 8), (2, 10), (3, 4)]);
2659 /// # }));
2660 /// # }
2661 /// ```
2662 pub fn across_ticks<Out: BatchAtomic>(
2663 self,
2664 thunk: impl FnOnce(KeyedStream<K, V, Atomic<L>, Unbounded, O, R>) -> Out,
2665 ) -> Out::Batched {
2666 thunk(self.all_ticks_atomic()).batched_atomic()
2667 }
2668
2669 /// Shifts the entries in `self` to the **next tick**, so that the returned keyed stream at
2670 /// tick `T` always has the entries of `self` at tick `T - 1`.
2671 ///
2672 /// At tick `0`, the output keyed stream is empty, since there is no previous tick.
2673 ///
2674 /// This operator enables stateful iterative processing with ticks, by sending data from one
2675 /// tick to the next. For example, you can use it to combine inputs across consecutive batches.
2676 ///
2677 /// # Example
2678 /// ```rust
2679 /// # #[cfg(feature = "deploy")] {
2680 /// # use hydro_lang::prelude::*;
2681 /// # use futures::StreamExt;
2682 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2683 /// let tick = process.tick();
2684 /// # // ticks are lazy by default, forces the second tick to run
2685 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2686 /// # let batch_first_tick = process
2687 /// # .source_iter(q!(vec![(1, 2), (1, 3)]))
2688 /// # .batch(&tick, nondet!(/** test */))
2689 /// # .into_keyed();
2690 /// # let batch_second_tick = process
2691 /// # .source_iter(q!(vec![(1, 4), (2, 5)]))
2692 /// # .batch(&tick, nondet!(/** test */))
2693 /// # .defer_tick()
2694 /// # .into_keyed(); // appears on the second tick
2695 /// let changes_across_ticks = // { 1: [2, 3] } (first tick), { 1: [4], 2: [5] } (second tick)
2696 /// # batch_first_tick.chain(batch_second_tick);
2697 /// changes_across_ticks.clone().defer_tick().chain( // from the previous tick
2698 /// changes_across_ticks // from the current tick
2699 /// )
2700 /// # .entries().all_ticks()
2701 /// # }, |mut stream| async move {
2702 /// // First tick: { 1: [2, 3] }
2703 /// # let mut results = Vec::new();
2704 /// # for _ in 0..2 {
2705 /// # results.push(stream.next().await.unwrap());
2706 /// # }
2707 /// # results.sort();
2708 /// # assert_eq!(results, vec![(1, 2), (1, 3)]);
2709 /// // Second tick: { 1: [2, 3, 4], 2: [5] }
2710 /// # results.clear();
2711 /// # for _ in 0..4 {
2712 /// # results.push(stream.next().await.unwrap());
2713 /// # }
2714 /// # results.sort();
2715 /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5)]);
2716 /// // Third tick: { 1: [4], 2: [5] }
2717 /// # results.clear();
2718 /// # for _ in 0..2 {
2719 /// # results.push(stream.next().await.unwrap());
2720 /// # }
2721 /// # results.sort();
2722 /// # assert_eq!(results, vec![(1, 4), (2, 5)]);
2723 /// # }));
2724 /// # }
2725 /// ```
2726 pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2727 KeyedStream::new(
2728 self.location.clone(),
2729 HydroNode::DeferTick {
2730 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2731 metadata: self.location.new_node_metadata(KeyedStream::<
2732 K,
2733 V,
2734 Tick<L>,
2735 Bounded,
2736 O,
2737 R,
2738 >::collection_kind()),
2739 },
2740 )
2741 }
2742}
2743
2744#[cfg(test)]
2745mod tests {
2746 #[cfg(feature = "deploy")]
2747 use futures::{SinkExt, StreamExt};
2748 #[cfg(feature = "deploy")]
2749 use hydro_deploy::Deployment;
2750 #[cfg(any(feature = "deploy", feature = "sim"))]
2751 use stageleft::q;
2752
2753 #[cfg(any(feature = "deploy", feature = "sim"))]
2754 use crate::compile::builder::FlowBuilder;
2755 #[cfg(feature = "deploy")]
2756 use crate::live_collections::stream::ExactlyOnce;
2757 #[cfg(feature = "sim")]
2758 use crate::live_collections::stream::{NoOrder, TotalOrder};
2759 #[cfg(any(feature = "deploy", feature = "sim"))]
2760 use crate::location::Location;
2761 #[cfg(feature = "sim")]
2762 use crate::networking::TCP;
2763 #[cfg(any(feature = "deploy", feature = "sim"))]
2764 use crate::nondet::nondet;
2765 #[cfg(feature = "deploy")]
2766 use crate::properties::manual_proof;
2767
2768 #[cfg(feature = "deploy")]
2769 #[tokio::test]
2770 async fn reduce_watermark_filter() {
2771 let mut deployment = Deployment::new();
2772
2773 let mut flow = FlowBuilder::new();
2774 let node = flow.process::<()>();
2775 let external = flow.external::<()>();
2776
2777 let node_tick = node.tick();
2778 let watermark = node_tick.singleton(q!(2));
2779
2780 let sum = node
2781 .source_stream(q!(tokio_stream::iter([
2782 (0, 100),
2783 (1, 101),
2784 (2, 102),
2785 (2, 102)
2786 ])))
2787 .into_keyed()
2788 .reduce_watermark(
2789 watermark,
2790 q!(|acc, v| {
2791 *acc += v;
2792 }),
2793 )
2794 .snapshot(&node_tick, nondet!(/** test */))
2795 .entries()
2796 .all_ticks()
2797 .send_bincode_external(&external);
2798
2799 let nodes = flow
2800 .with_process(&node, deployment.Localhost())
2801 .with_external(&external, deployment.Localhost())
2802 .deploy(&mut deployment);
2803
2804 deployment.deploy().await.unwrap();
2805
2806 let mut out = nodes.connect(sum).await;
2807
2808 deployment.start().await.unwrap();
2809
2810 assert_eq!(out.next().await.unwrap(), (2, 204));
2811 }
2812
2813 #[cfg(feature = "deploy")]
2814 #[tokio::test]
2815 async fn reduce_watermark_bounded() {
2816 let mut deployment = Deployment::new();
2817
2818 let mut flow = FlowBuilder::new();
2819 let node = flow.process::<()>();
2820 let external = flow.external::<()>();
2821
2822 let node_tick = node.tick();
2823 let watermark = node_tick.singleton(q!(2));
2824
2825 let sum = node
2826 .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2827 .into_keyed()
2828 .reduce_watermark(
2829 watermark,
2830 q!(|acc, v| {
2831 *acc += v;
2832 }),
2833 )
2834 .entries()
2835 .send_bincode_external(&external);
2836
2837 let nodes = flow
2838 .with_process(&node, deployment.Localhost())
2839 .with_external(&external, deployment.Localhost())
2840 .deploy(&mut deployment);
2841
2842 deployment.deploy().await.unwrap();
2843
2844 let mut out = nodes.connect(sum).await;
2845
2846 deployment.start().await.unwrap();
2847
2848 assert_eq!(out.next().await.unwrap(), (2, 204));
2849 }
2850
2851 #[cfg(feature = "deploy")]
2852 #[tokio::test]
2853 async fn reduce_watermark_garbage_collect() {
2854 let mut deployment = Deployment::new();
2855
2856 let mut flow = FlowBuilder::new();
2857 let node = flow.process::<()>();
2858 let external = flow.external::<()>();
2859 let (tick_send, tick_trigger) =
2860 node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
2861
2862 let node_tick = node.tick();
2863 let (watermark_complete_cycle, watermark) =
2864 node_tick.cycle_with_initial(node_tick.singleton(q!(2)));
2865 let next_watermark = watermark.clone().map(q!(|v| v + 1));
2866 watermark_complete_cycle.complete_next_tick(next_watermark);
2867
2868 let tick_triggered_input = node_tick
2869 .singleton(q!((3, 103)))
2870 .into_stream()
2871 .filter_if(
2872 tick_trigger
2873 .clone()
2874 .batch(&node_tick, nondet!(/** test */))
2875 .first()
2876 .is_some(),
2877 )
2878 .all_ticks();
2879
2880 let sum = node
2881 .source_stream(q!(tokio_stream::iter([
2882 (0, 100),
2883 (1, 101),
2884 (2, 102),
2885 (2, 102)
2886 ])))
2887 .merge_unordered(tick_triggered_input)
2888 .into_keyed()
2889 .reduce_watermark(
2890 watermark,
2891 q!(
2892 |acc, v| {
2893 *acc += v;
2894 },
2895 commutative = manual_proof!(/** integer addition is commutative */)
2896 ),
2897 )
2898 .snapshot(&node_tick, nondet!(/** test */))
2899 .entries()
2900 .all_ticks()
2901 .send_bincode_external(&external);
2902
2903 let nodes = flow
2904 .with_default_optimize()
2905 .with_process(&node, deployment.Localhost())
2906 .with_external(&external, deployment.Localhost())
2907 .deploy(&mut deployment);
2908
2909 deployment.deploy().await.unwrap();
2910
2911 let mut tick_send = nodes.connect(tick_send).await;
2912 let mut out_recv = nodes.connect(sum).await;
2913
2914 deployment.start().await.unwrap();
2915
2916 assert_eq!(out_recv.next().await.unwrap(), (2, 204));
2917
2918 tick_send.send(()).await.unwrap();
2919
2920 assert_eq!(out_recv.next().await.unwrap(), (3, 103));
2921 }
2922
2923 #[cfg(feature = "sim")]
2924 #[test]
2925 #[should_panic]
2926 fn sim_batch_nondet_size() {
2927 let mut flow = FlowBuilder::new();
2928 let node = flow.process::<()>();
2929
2930 let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2931
2932 let tick = node.tick();
2933 let out_recv = input
2934 .batch(&tick, nondet!(/** test */))
2935 .fold(q!(|| vec![]), q!(|acc, v| acc.push(v)))
2936 .entries()
2937 .all_ticks()
2938 .sim_output();
2939
2940 flow.sim().exhaustive(async || {
2941 out_recv
2942 .assert_yields_only_unordered([(1, vec![1, 2])])
2943 .await;
2944 });
2945 }
2946
2947 #[cfg(feature = "sim")]
2948 #[test]
2949 fn sim_batch_preserves_group_order() {
2950 let mut flow = FlowBuilder::new();
2951 let node = flow.process::<()>();
2952
2953 let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2954
2955 let tick = node.tick();
2956 let out_recv = input
2957 .batch(&tick, nondet!(/** test */))
2958 .all_ticks()
2959 .fold_early_stop(
2960 q!(|| 0),
2961 q!(|acc, v| {
2962 *acc = std::cmp::max(v, *acc);
2963 *acc >= 2
2964 }),
2965 )
2966 .entries()
2967 .sim_output();
2968
2969 let instances = flow.sim().exhaustive(async || {
2970 out_recv
2971 .assert_yields_only_unordered([(1, 2), (2, 3)])
2972 .await;
2973 });
2974
2975 assert_eq!(instances, 8);
2976 // - three cases: all three in a separate tick (pick where (2, 3) is)
2977 // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after
2978 // - two cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them
2979 // - one case: all three together
2980 }
2981
2982 #[cfg(feature = "sim")]
2983 #[test]
2984 fn sim_batch_unordered_shuffles() {
2985 let mut flow = FlowBuilder::new();
2986 let node = flow.process::<()>();
2987
2988 let input = node
2989 .source_iter(q!([(1, 1), (1, 2), (2, 3)]))
2990 .into_keyed()
2991 .weaken_ordering::<NoOrder>();
2992
2993 let tick = node.tick();
2994 let out_recv = input
2995 .batch(&tick, nondet!(/** test */))
2996 .all_ticks()
2997 .entries()
2998 .sim_output();
2999
3000 let instances = flow.sim().exhaustive(async || {
3001 out_recv
3002 .assert_yields_only_unordered([(1, 1), (1, 2), (2, 3)])
3003 .await;
3004 });
3005
3006 assert_eq!(instances, 13);
3007 // - 6 (3 * 2) cases: all three in a separate tick (pick where (2, 3) is), and order of (1, 1), (1, 2)
3008 // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
3009 // - 4 (2 * 2) cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them, and order of (1, 1), (1, 2)
3010 // - one case: all three together (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
3011 }
3012
3013 #[cfg(feature = "sim")]
3014 #[test]
3015 #[should_panic]
3016 fn sim_observe_order_batched() {
3017 let mut flow = FlowBuilder::new();
3018 let node = flow.process::<()>();
3019
3020 let (port, input) = node.sim_input::<_, NoOrder, _>();
3021
3022 let tick = node.tick();
3023 let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
3024 let out_recv = batch
3025 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3026 .all_ticks()
3027 .first()
3028 .entries()
3029 .sim_output();
3030
3031 flow.sim().exhaustive(async || {
3032 port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
3033 out_recv
3034 .assert_yields_only_unordered([(1, 1), (2, 1)])
3035 .await; // fails with assume_ordering
3036 });
3037 }
3038
3039 #[cfg(feature = "sim")]
3040 #[test]
3041 fn sim_observe_order_batched_count() {
3042 let mut flow = FlowBuilder::new();
3043 let node = flow.process::<()>();
3044
3045 let (port, input) = node.sim_input::<_, NoOrder, _>();
3046
3047 let tick = node.tick();
3048 let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
3049 let out_recv = batch
3050 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3051 .all_ticks()
3052 .entries()
3053 .sim_output();
3054
3055 let instance_count = flow.sim().exhaustive(async || {
3056 port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
3057 let _ = out_recv.collect_sorted::<Vec<_>>().await;
3058 });
3059
3060 assert_eq!(instance_count, 104); // too complicated to enumerate here, but less than stream equivalent
3061 }
3062
3063 #[cfg(feature = "sim")]
3064 #[test]
3065 fn sim_top_level_assume_ordering() {
3066 use std::collections::HashMap;
3067
3068 let mut flow = FlowBuilder::new();
3069 let node = flow.process::<()>();
3070
3071 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3072
3073 let out_recv = input
3074 .into_keyed()
3075 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3076 .fold_early_stop(
3077 q!(|| Vec::new()),
3078 q!(|acc, v| {
3079 acc.push(v);
3080 acc.len() >= 2
3081 }),
3082 )
3083 .entries()
3084 .sim_output();
3085
3086 let instance_count = flow.sim().exhaustive(async || {
3087 in_send.send_many_unordered([(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd')]);
3088 let out: HashMap<_, _> = out_recv
3089 .collect_sorted::<Vec<_>>()
3090 .await
3091 .into_iter()
3092 .collect();
3093 // Each key accumulates its values; we get one entry per key
3094 assert_eq!(out.len(), 2);
3095 });
3096
3097 assert_eq!(instance_count, 24)
3098 }
3099
3100 #[cfg(feature = "sim")]
3101 #[test]
3102 fn sim_top_level_assume_ordering_cycle_back() {
3103 use std::collections::HashMap;
3104
3105 let mut flow = FlowBuilder::new();
3106 let node = flow.process::<()>();
3107 let node2 = flow.process::<()>();
3108
3109 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3110
3111 let (complete_cycle_back, cycle_back) =
3112 node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3113 let ordered = input
3114 .into_keyed()
3115 .merge_unordered(cycle_back)
3116 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3117 complete_cycle_back.complete(
3118 ordered
3119 .clone()
3120 .map(q!(|v| v + 1))
3121 .filter(q!(|v| v % 2 == 1))
3122 .entries()
3123 .send(&node2, TCP.fail_stop().bincode())
3124 .send(&node, TCP.fail_stop().bincode())
3125 .into_keyed(),
3126 );
3127
3128 let out_recv = ordered
3129 .fold_early_stop(
3130 q!(|| Vec::new()),
3131 q!(|acc, v| {
3132 acc.push(v);
3133 acc.len() >= 2
3134 }),
3135 )
3136 .entries()
3137 .sim_output();
3138
3139 let mut saw = false;
3140 let instance_count = flow.sim().exhaustive(async || {
3141 // Send (1, 0) and (1, 2). 0+1=1 is odd so cycles back.
3142 // We want to see [0, 1] - the cycled back value interleaved
3143 in_send.send_many_unordered([(1, 0), (1, 2)]);
3144 let out: HashMap<_, _> = out_recv
3145 .collect_sorted::<Vec<_>>()
3146 .await
3147 .into_iter()
3148 .collect();
3149
3150 // We want to see an instance where key 1 gets: 0, then 1 (cycled back from 0+1)
3151 if let Some(values) = out.get(&1)
3152 && *values == vec![0, 1]
3153 {
3154 saw = true;
3155 }
3156 });
3157
3158 assert!(
3159 saw,
3160 "did not see an instance with key 1 having [0, 1] in order"
3161 );
3162 assert_eq!(instance_count, 6);
3163 }
3164
3165 #[cfg(feature = "sim")]
3166 #[test]
3167 fn sim_top_level_assume_ordering_cross_key_cycle() {
3168 use std::collections::HashMap;
3169
3170 // This test demonstrates why releasing one entry at a time is important:
3171 // When one key's observed order cycles back into a different key, we need
3172 // to be able to interleave the cycled-back entry with pending items for
3173 // that other key.
3174 let mut flow = FlowBuilder::new();
3175 let node = flow.process::<()>();
3176 let node2 = flow.process::<()>();
3177
3178 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3179
3180 let (complete_cycle_back, cycle_back) =
3181 node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3182 let ordered = input
3183 .into_keyed()
3184 .merge_unordered(cycle_back)
3185 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3186
3187 // Cycle back: when we see (1, 10), emit (2, 100) to key 2
3188 complete_cycle_back.complete(
3189 ordered
3190 .clone()
3191 .filter(q!(|v| *v == 10))
3192 .map(q!(|_| 100))
3193 .entries()
3194 .map(q!(|(_, v)| (2, v))) // Change key from 1 to 2
3195 .send(&node2, TCP.fail_stop().bincode())
3196 .send(&node, TCP.fail_stop().bincode())
3197 .into_keyed(),
3198 );
3199
3200 let out_recv = ordered
3201 .fold_early_stop(
3202 q!(|| Vec::new()),
3203 q!(|acc, v| {
3204 acc.push(v);
3205 acc.len() >= 2
3206 }),
3207 )
3208 .entries()
3209 .sim_output();
3210
3211 // We want to see an instance where:
3212 // - (1, 10) is released first
3213 // - This causes (2, 100) to be cycled back
3214 // - (2, 100) is released BEFORE (2, 20) which was already pending
3215 let mut saw_cross_key_interleave = false;
3216 let instance_count = flow.sim().exhaustive(async || {
3217 // Send (1, 10), (1, 11) for key 1, and (2, 20), (2, 21) for key 2
3218 in_send.send_many_unordered([(1, 10), (1, 11), (2, 20), (2, 21)]);
3219 let out: HashMap<_, _> = out_recv
3220 .collect_sorted::<Vec<_>>()
3221 .await
3222 .into_iter()
3223 .collect();
3224
3225 // Check if we see the cross-key interleaving:
3226 // key 2 should have [100, 20] or [100, 21] - cycled back 100 before a pending item
3227 if let Some(values) = out.get(&2)
3228 && values.len() >= 2
3229 && values[0] == 100
3230 {
3231 saw_cross_key_interleave = true;
3232 }
3233 });
3234
3235 assert!(
3236 saw_cross_key_interleave,
3237 "did not see an instance where cycled-back 100 was released before pending items for key 2"
3238 );
3239 assert_eq!(instance_count, 60);
3240 }
3241
3242 #[cfg(feature = "sim")]
3243 #[test]
3244 fn sim_top_level_assume_ordering_cycle_back_tick() {
3245 use std::collections::HashMap;
3246
3247 let mut flow = FlowBuilder::new();
3248 let node = flow.process::<()>();
3249 let node2 = flow.process::<()>();
3250
3251 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3252
3253 let (complete_cycle_back, cycle_back) =
3254 node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3255 let ordered = input
3256 .into_keyed()
3257 .merge_unordered(cycle_back)
3258 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3259 complete_cycle_back.complete(
3260 ordered
3261 .clone()
3262 .batch(&node.tick(), nondet!(/** test */))
3263 .all_ticks()
3264 .map(q!(|v| v + 1))
3265 .filter(q!(|v| v % 2 == 1))
3266 .entries()
3267 .send(&node2, TCP.fail_stop().bincode())
3268 .send(&node, TCP.fail_stop().bincode())
3269 .into_keyed(),
3270 );
3271
3272 let out_recv = ordered
3273 .fold_early_stop(
3274 q!(|| Vec::new()),
3275 q!(|acc, v| {
3276 acc.push(v);
3277 acc.len() >= 2
3278 }),
3279 )
3280 .entries()
3281 .sim_output();
3282
3283 let mut saw = false;
3284 let instance_count = flow.sim().exhaustive(async || {
3285 in_send.send_many_unordered([(1, 0), (1, 2)]);
3286 let out: HashMap<_, _> = out_recv
3287 .collect_sorted::<Vec<_>>()
3288 .await
3289 .into_iter()
3290 .collect();
3291
3292 if let Some(values) = out.get(&1)
3293 && *values == vec![0, 1]
3294 {
3295 saw = true;
3296 }
3297 });
3298
3299 assert!(
3300 saw,
3301 "did not see an instance with key 1 having [0, 1] in order"
3302 );
3303 assert_eq!(instance_count, 58);
3304 }
3305}