hydro_lang/live_collections/keyed_stream/mod.rs
1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
11
12use super::boundedness::{Bounded, Boundedness, Unbounded};
13use super::keyed_singleton::KeyedSingleton;
14use super::optional::Optional;
15use super::stream::{ExactlyOnce, MinOrder, MinRetries, NoOrder, Stream, TotalOrder};
16use crate::compile::ir::{
17 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
18};
19#[cfg(stageleft_runtime)]
20use crate::forward_handle::{CycleCollection, ReceiverComplete};
21use crate::forward_handle::{ForwardRef, TickCycle};
22use crate::live_collections::batch_atomic::BatchAtomic;
23use crate::live_collections::stream::{AtLeastOnce, Ordering, Retries};
24#[cfg(stageleft_runtime)]
25use crate::location::dynamic::{DynLocation, LocationId};
26use crate::location::tick::DeferTick;
27use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
28use crate::manual_expr::ManualExpr;
29use crate::nondet::{NonDet, nondet};
30use crate::properties::{AggFuncAlgebra, ValidCommutativityFor, ValidIdempotenceFor};
31
32pub mod networking;
33
34/// Streaming elements of type `V` grouped by a key of type `K`.
35///
36/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
37/// order of keys is non-deterministic but the order *within* each group may be deterministic.
38///
39/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
40/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
41/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
42///
43/// Type Parameters:
44/// - `K`: the type of the key for each group
45/// - `V`: the type of the elements inside each group
46/// - `Loc`: the [`Location`] where the keyed stream is materialized
47/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
48/// - `Order`: tracks whether the elements within each group have deterministic order
49/// ([`TotalOrder`]) or not ([`NoOrder`])
50/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
51/// ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
52pub struct KeyedStream<
53 K,
54 V,
55 Loc,
56 Bound: Boundedness = Unbounded,
57 Order: Ordering = TotalOrder,
58 Retry: Retries = ExactlyOnce,
59> {
60 pub(crate) location: Loc,
61 pub(crate) ir_node: RefCell<HydroNode>,
62
63 _phantom: PhantomData<(K, V, Loc, Bound, Order, Retry)>,
64}
65
66impl<'a, K, V, L, O: Ordering, R: Retries> From<KeyedStream<K, V, L, Bounded, O, R>>
67 for KeyedStream<K, V, L, Unbounded, O, R>
68where
69 L: Location<'a>,
70{
71 fn from(stream: KeyedStream<K, V, L, Bounded, O, R>) -> KeyedStream<K, V, L, Unbounded, O, R> {
72 let new_meta = stream
73 .location
74 .new_node_metadata(KeyedStream::<K, V, L, Unbounded, O, R>::collection_kind());
75
76 KeyedStream {
77 location: stream.location,
78 ir_node: RefCell::new(HydroNode::Cast {
79 inner: Box::new(stream.ir_node.into_inner()),
80 metadata: new_meta,
81 }),
82 _phantom: PhantomData,
83 }
84 }
85}
86
87impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
88 for KeyedStream<K, V, L, B, NoOrder, R>
89where
90 L: Location<'a>,
91{
92 fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
93 stream.weakest_ordering()
94 }
95}
96
97impl<'a, K, V, L, O: Ordering, R: Retries> DeferTick for KeyedStream<K, V, Tick<L>, Bounded, O, R>
98where
99 L: Location<'a>,
100{
101 fn defer_tick(self) -> Self {
102 KeyedStream::defer_tick(self)
103 }
104}
105
106impl<'a, K, V, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
107 for KeyedStream<K, V, Tick<L>, Bounded, O, R>
108where
109 L: Location<'a>,
110{
111 type Location = Tick<L>;
112
113 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
114 KeyedStream {
115 location: location.clone(),
116 ir_node: RefCell::new(HydroNode::CycleSource {
117 ident,
118 metadata: location.new_node_metadata(
119 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
120 ),
121 }),
122 _phantom: PhantomData,
123 }
124 }
125}
126
127impl<'a, K, V, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
128 for KeyedStream<K, V, Tick<L>, Bounded, O, R>
129where
130 L: Location<'a>,
131{
132 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
133 assert_eq!(
134 Location::id(&self.location),
135 expected_location,
136 "locations do not match"
137 );
138
139 self.location
140 .flow_state()
141 .borrow_mut()
142 .push_root(HydroRoot::CycleSink {
143 ident,
144 input: Box::new(self.ir_node.into_inner()),
145 op_metadata: HydroIrOpMetadata::new(),
146 });
147 }
148}
149
150impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
151 for KeyedStream<K, V, L, B, O, R>
152where
153 L: Location<'a> + NoTick,
154{
155 type Location = L;
156
157 fn create_source(ident: syn::Ident, location: L) -> Self {
158 KeyedStream {
159 location: location.clone(),
160 ir_node: RefCell::new(HydroNode::CycleSource {
161 ident,
162 metadata: location
163 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
164 }),
165 _phantom: PhantomData,
166 }
167 }
168}
169
170impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
171 for KeyedStream<K, V, L, B, O, R>
172where
173 L: Location<'a> + NoTick,
174{
175 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
176 assert_eq!(
177 Location::id(&self.location),
178 expected_location,
179 "locations do not match"
180 );
181 self.location
182 .flow_state()
183 .borrow_mut()
184 .push_root(HydroRoot::CycleSink {
185 ident,
186 input: Box::new(self.ir_node.into_inner()),
187 op_metadata: HydroIrOpMetadata::new(),
188 });
189 }
190}
191
192impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
193 Clone for KeyedStream<K, V, Loc, Bound, Order, R>
194{
195 fn clone(&self) -> Self {
196 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
197 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
198 *self.ir_node.borrow_mut() = HydroNode::Tee {
199 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
200 metadata: self.location.new_node_metadata(Self::collection_kind()),
201 };
202 }
203
204 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
205 KeyedStream {
206 location: self.location.clone(),
207 ir_node: HydroNode::Tee {
208 inner: TeeNode(inner.0.clone()),
209 metadata: metadata.clone(),
210 }
211 .into(),
212 _phantom: PhantomData,
213 }
214 } else {
215 unreachable!()
216 }
217 }
218}
219
220impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
221 KeyedStream<K, V, L, B, O, R>
222{
223 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
224 debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
225 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
226
227 KeyedStream {
228 location,
229 ir_node: RefCell::new(ir_node),
230 _phantom: PhantomData,
231 }
232 }
233
234 /// Returns the [`CollectionKind`] corresponding to this type.
235 pub fn collection_kind() -> CollectionKind {
236 CollectionKind::KeyedStream {
237 bound: B::BOUND_KIND,
238 value_order: O::ORDERING_KIND,
239 value_retry: R::RETRIES_KIND,
240 key_type: stageleft::quote_type::<K>().into(),
241 value_type: stageleft::quote_type::<V>().into(),
242 }
243 }
244
245 /// Returns the [`Location`] where this keyed stream is being materialized.
246 pub fn location(&self) -> &L {
247 &self.location
248 }
249
250 /// Explicitly "casts" the keyed stream to a type with a different ordering
251 /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
252 /// by the type-system.
253 ///
254 /// # Non-Determinism
255 /// This function is used as an escape hatch, and any mistakes in the
256 /// provided ordering guarantee will propagate into the guarantees
257 /// for the rest of the program.
258 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
259 if O::ORDERING_KIND == O2::ORDERING_KIND {
260 KeyedStream::new(self.location, self.ir_node.into_inner())
261 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
262 // We can always weaken the ordering guarantee
263 KeyedStream::new(
264 self.location.clone(),
265 HydroNode::Cast {
266 inner: Box::new(self.ir_node.into_inner()),
267 metadata: self
268 .location
269 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
270 },
271 )
272 } else {
273 KeyedStream::new(
274 self.location.clone(),
275 HydroNode::ObserveNonDet {
276 inner: Box::new(self.ir_node.into_inner()),
277 trusted: false,
278 metadata: self
279 .location
280 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
281 },
282 )
283 }
284 }
285
286 fn assume_ordering_trusted<O2: Ordering>(
287 self,
288 _nondet: NonDet,
289 ) -> KeyedStream<K, V, L, B, O2, R> {
290 if O::ORDERING_KIND == O2::ORDERING_KIND {
291 KeyedStream::new(self.location, self.ir_node.into_inner())
292 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
293 // We can always weaken the ordering guarantee
294 KeyedStream::new(
295 self.location.clone(),
296 HydroNode::Cast {
297 inner: Box::new(self.ir_node.into_inner()),
298 metadata: self
299 .location
300 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
301 },
302 )
303 } else {
304 KeyedStream::new(
305 self.location.clone(),
306 HydroNode::ObserveNonDet {
307 inner: Box::new(self.ir_node.into_inner()),
308 trusted: true,
309 metadata: self
310 .location
311 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
312 },
313 )
314 }
315 }
316
317 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
318 /// which is always safe because that is the weakest possible guarantee.
319 pub fn weakest_ordering(self) -> KeyedStream<K, V, L, B, NoOrder, R> {
320 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
321 self.assume_ordering::<NoOrder>(nondet)
322 }
323
324 /// Explicitly "casts" the keyed stream to a type with a different retries
325 /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
326 /// be proven by the type-system.
327 ///
328 /// # Non-Determinism
329 /// This function is used as an escape hatch, and any mistakes in the
330 /// provided retries guarantee will propagate into the guarantees
331 /// for the rest of the program.
332 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
333 if R::RETRIES_KIND == R2::RETRIES_KIND {
334 KeyedStream::new(self.location, self.ir_node.into_inner())
335 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
336 // We can always weaken the retries guarantee
337 KeyedStream::new(
338 self.location.clone(),
339 HydroNode::Cast {
340 inner: Box::new(self.ir_node.into_inner()),
341 metadata: self
342 .location
343 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
344 },
345 )
346 } else {
347 KeyedStream::new(
348 self.location.clone(),
349 HydroNode::ObserveNonDet {
350 inner: Box::new(self.ir_node.into_inner()),
351 trusted: false,
352 metadata: self
353 .location
354 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
355 },
356 )
357 }
358 }
359
360 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
361 /// which is always safe because that is the weakest possible guarantee.
362 pub fn weakest_retries(self) -> KeyedStream<K, V, L, B, O, AtLeastOnce> {
363 let nondet = nondet!(/** this is a weaker retries guarantee, so it is safe to assume */);
364 self.assume_retries::<AtLeastOnce>(nondet)
365 }
366
367 /// Flattens the keyed stream into an unordered stream of key-value pairs.
368 ///
369 /// # Example
370 /// ```rust
371 /// # #[cfg(feature = "deploy")] {
372 /// # use hydro_lang::prelude::*;
373 /// # use futures::StreamExt;
374 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
375 /// process
376 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
377 /// .into_keyed()
378 /// .entries()
379 /// # }, |mut stream| async move {
380 /// // (1, 2), (1, 3), (2, 4) in any order
381 /// # let mut results = Vec::new();
382 /// # for _ in 0..3 {
383 /// # results.push(stream.next().await.unwrap());
384 /// # }
385 /// # results.sort();
386 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
387 /// # }));
388 /// # }
389 /// ```
390 pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
391 Stream::new(
392 self.location.clone(),
393 HydroNode::Cast {
394 inner: Box::new(self.ir_node.into_inner()),
395 metadata: self
396 .location
397 .new_node_metadata(Stream::<(K, V), L, B, NoOrder, R>::collection_kind()),
398 },
399 )
400 }
401
402 /// Flattens the keyed stream into an unordered stream of only the values.
403 ///
404 /// # Example
405 /// ```rust
406 /// # #[cfg(feature = "deploy")] {
407 /// # use hydro_lang::prelude::*;
408 /// # use futures::StreamExt;
409 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
410 /// process
411 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
412 /// .into_keyed()
413 /// .values()
414 /// # }, |mut stream| async move {
415 /// // 2, 3, 4 in any order
416 /// # let mut results = Vec::new();
417 /// # for _ in 0..3 {
418 /// # results.push(stream.next().await.unwrap());
419 /// # }
420 /// # results.sort();
421 /// # assert_eq!(results, vec![2, 3, 4]);
422 /// # }));
423 /// # }
424 /// ```
425 pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
426 self.entries().map(q!(|(_, v)| v))
427 }
428
429 /// Transforms each value by invoking `f` on each element, with keys staying the same
430 /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
431 ///
432 /// If you do not want to modify the stream and instead only want to view
433 /// each item use [`KeyedStream::inspect`] instead.
434 ///
435 /// # Example
436 /// ```rust
437 /// # #[cfg(feature = "deploy")] {
438 /// # use hydro_lang::prelude::*;
439 /// # use futures::StreamExt;
440 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
441 /// process
442 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
443 /// .into_keyed()
444 /// .map(q!(|v| v + 1))
445 /// # .entries()
446 /// # }, |mut stream| async move {
447 /// // { 1: [3, 4], 2: [5] }
448 /// # let mut results = Vec::new();
449 /// # for _ in 0..3 {
450 /// # results.push(stream.next().await.unwrap());
451 /// # }
452 /// # results.sort();
453 /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 5)]);
454 /// # }));
455 /// # }
456 /// ```
457 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
458 where
459 F: Fn(V) -> U + 'a,
460 {
461 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
462 let map_f = q!({
463 let orig = f;
464 move |(k, v)| (k, orig(v))
465 })
466 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
467 .into();
468
469 KeyedStream::new(
470 self.location.clone(),
471 HydroNode::Map {
472 f: map_f,
473 input: Box::new(self.ir_node.into_inner()),
474 metadata: self
475 .location
476 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
477 },
478 )
479 }
480
481 /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
482 /// re-grouped even they are tuples; instead they will be grouped under the original key.
483 ///
484 /// If you do not want to modify the stream and instead only want to view
485 /// each item use [`KeyedStream::inspect_with_key`] instead.
486 ///
487 /// # Example
488 /// ```rust
489 /// # #[cfg(feature = "deploy")] {
490 /// # use hydro_lang::prelude::*;
491 /// # use futures::StreamExt;
492 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
493 /// process
494 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
495 /// .into_keyed()
496 /// .map_with_key(q!(|(k, v)| k + v))
497 /// # .entries()
498 /// # }, |mut stream| async move {
499 /// // { 1: [3, 4], 2: [6] }
500 /// # let mut results = Vec::new();
501 /// # for _ in 0..3 {
502 /// # results.push(stream.next().await.unwrap());
503 /// # }
504 /// # results.sort();
505 /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 6)]);
506 /// # }));
507 /// # }
508 /// ```
509 pub fn map_with_key<U, F>(
510 self,
511 f: impl IntoQuotedMut<'a, F, L> + Copy,
512 ) -> KeyedStream<K, U, L, B, O, R>
513 where
514 F: Fn((K, V)) -> U + 'a,
515 K: Clone,
516 {
517 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
518 let map_f = q!({
519 let orig = f;
520 move |(k, v)| {
521 let out = orig((Clone::clone(&k), v));
522 (k, out)
523 }
524 })
525 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
526 .into();
527
528 KeyedStream::new(
529 self.location.clone(),
530 HydroNode::Map {
531 f: map_f,
532 input: Box::new(self.ir_node.into_inner()),
533 metadata: self
534 .location
535 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
536 },
537 )
538 }
539
540 /// Prepends a new value to the key of each element in the stream, producing a new
541 /// keyed stream with compound keys. Because the original key is preserved, no re-grouping
542 /// occurs and the elements in each group preserve their original order.
543 ///
544 /// # Example
545 /// ```rust
546 /// # #[cfg(feature = "deploy")] {
547 /// # use hydro_lang::prelude::*;
548 /// # use futures::StreamExt;
549 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
550 /// process
551 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
552 /// .into_keyed()
553 /// .prefix_key(q!(|&(k, _)| k % 2))
554 /// # .entries()
555 /// # }, |mut stream| async move {
556 /// // { (1, 1): [2, 3], (0, 2): [4] }
557 /// # let mut results = Vec::new();
558 /// # for _ in 0..3 {
559 /// # results.push(stream.next().await.unwrap());
560 /// # }
561 /// # results.sort();
562 /// # assert_eq!(results, vec![((0, 2), 4), ((1, 1), 2), ((1, 1), 3)]);
563 /// # }));
564 /// # }
565 /// ```
566 pub fn prefix_key<K2, F>(
567 self,
568 f: impl IntoQuotedMut<'a, F, L> + Copy,
569 ) -> KeyedStream<(K2, K), V, L, B, O, R>
570 where
571 F: Fn(&(K, V)) -> K2 + 'a,
572 {
573 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
574 let map_f = q!({
575 let orig = f;
576 move |kv| {
577 let out = orig(&kv);
578 ((out, kv.0), kv.1)
579 }
580 })
581 .splice_fn1_ctx::<(K, V), ((K2, K), V)>(&self.location)
582 .into();
583
584 KeyedStream::new(
585 self.location.clone(),
586 HydroNode::Map {
587 f: map_f,
588 input: Box::new(self.ir_node.into_inner()),
589 metadata: self
590 .location
591 .new_node_metadata(KeyedStream::<(K2, K), V, L, B, O, R>::collection_kind()),
592 },
593 )
594 }
595
596 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
597 /// `f`, preserving the order of the elements within the group.
598 ///
599 /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
600 /// not modify or take ownership of the values. If you need to modify the values while filtering
601 /// use [`KeyedStream::filter_map`] instead.
602 ///
603 /// # Example
604 /// ```rust
605 /// # #[cfg(feature = "deploy")] {
606 /// # use hydro_lang::prelude::*;
607 /// # use futures::StreamExt;
608 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
609 /// process
610 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
611 /// .into_keyed()
612 /// .filter(q!(|&x| x > 2))
613 /// # .entries()
614 /// # }, |mut stream| async move {
615 /// // { 1: [3], 2: [4] }
616 /// # let mut results = Vec::new();
617 /// # for _ in 0..2 {
618 /// # results.push(stream.next().await.unwrap());
619 /// # }
620 /// # results.sort();
621 /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
622 /// # }));
623 /// # }
624 /// ```
625 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
626 where
627 F: Fn(&V) -> bool + 'a,
628 {
629 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
630 let filter_f = q!({
631 let orig = f;
632 move |t: &(_, _)| orig(&t.1)
633 })
634 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
635 .into();
636
637 KeyedStream::new(
638 self.location.clone(),
639 HydroNode::Filter {
640 f: filter_f,
641 input: Box::new(self.ir_node.into_inner()),
642 metadata: self.location.new_node_metadata(Self::collection_kind()),
643 },
644 )
645 }
646
647 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
648 /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
649 ///
650 /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
651 /// not modify or take ownership of the values. If you need to modify the values while filtering
652 /// use [`KeyedStream::filter_map_with_key`] instead.
653 ///
654 /// # Example
655 /// ```rust
656 /// # #[cfg(feature = "deploy")] {
657 /// # use hydro_lang::prelude::*;
658 /// # use futures::StreamExt;
659 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
660 /// process
661 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
662 /// .into_keyed()
663 /// .filter_with_key(q!(|&(k, v)| v - k == 2))
664 /// # .entries()
665 /// # }, |mut stream| async move {
666 /// // { 1: [3], 2: [4] }
667 /// # let mut results = Vec::new();
668 /// # for _ in 0..2 {
669 /// # results.push(stream.next().await.unwrap());
670 /// # }
671 /// # results.sort();
672 /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
673 /// # }));
674 /// # }
675 /// ```
676 pub fn filter_with_key<F>(
677 self,
678 f: impl IntoQuotedMut<'a, F, L> + Copy,
679 ) -> KeyedStream<K, V, L, B, O, R>
680 where
681 F: Fn(&(K, V)) -> bool + 'a,
682 {
683 let filter_f = f
684 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
685 .into();
686
687 KeyedStream::new(
688 self.location.clone(),
689 HydroNode::Filter {
690 f: filter_f,
691 input: Box::new(self.ir_node.into_inner()),
692 metadata: self.location.new_node_metadata(Self::collection_kind()),
693 },
694 )
695 }
696
697 /// An operator that both filters and maps each value, with keys staying the same.
698 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
699 /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
700 ///
701 /// # Example
702 /// ```rust
703 /// # #[cfg(feature = "deploy")] {
704 /// # use hydro_lang::prelude::*;
705 /// # use futures::StreamExt;
706 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
707 /// process
708 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
709 /// .into_keyed()
710 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
711 /// # .entries()
712 /// # }, |mut stream| async move {
713 /// // { 1: [2], 2: [4] }
714 /// # let mut results = Vec::new();
715 /// # for _ in 0..2 {
716 /// # results.push(stream.next().await.unwrap());
717 /// # }
718 /// # results.sort();
719 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
720 /// # }));
721 /// # }
722 /// ```
723 pub fn filter_map<U, F>(
724 self,
725 f: impl IntoQuotedMut<'a, F, L> + Copy,
726 ) -> KeyedStream<K, U, L, B, O, R>
727 where
728 F: Fn(V) -> Option<U> + 'a,
729 {
730 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
731 let filter_map_f = q!({
732 let orig = f;
733 move |(k, v)| orig(v).map(|o| (k, o))
734 })
735 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
736 .into();
737
738 KeyedStream::new(
739 self.location.clone(),
740 HydroNode::FilterMap {
741 f: filter_map_f,
742 input: Box::new(self.ir_node.into_inner()),
743 metadata: self
744 .location
745 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
746 },
747 )
748 }
749
750 /// An operator that both filters and maps each key-value pair. The resulting values are **not**
751 /// re-grouped even they are tuples; instead they will be grouped under the original key.
752 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
753 ///
754 /// # Example
755 /// ```rust
756 /// # #[cfg(feature = "deploy")] {
757 /// # use hydro_lang::prelude::*;
758 /// # use futures::StreamExt;
759 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
760 /// process
761 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
762 /// .into_keyed()
763 /// .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
764 /// # .entries()
765 /// # }, |mut stream| async move {
766 /// // { 2: [2] }
767 /// # let mut results = Vec::new();
768 /// # for _ in 0..1 {
769 /// # results.push(stream.next().await.unwrap());
770 /// # }
771 /// # results.sort();
772 /// # assert_eq!(results, vec![(2, 2)]);
773 /// # }));
774 /// # }
775 /// ```
776 pub fn filter_map_with_key<U, F>(
777 self,
778 f: impl IntoQuotedMut<'a, F, L> + Copy,
779 ) -> KeyedStream<K, U, L, B, O, R>
780 where
781 F: Fn((K, V)) -> Option<U> + 'a,
782 K: Clone,
783 {
784 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
785 let filter_map_f = q!({
786 let orig = f;
787 move |(k, v)| {
788 let out = orig((Clone::clone(&k), v));
789 out.map(|o| (k, o))
790 }
791 })
792 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
793 .into();
794
795 KeyedStream::new(
796 self.location.clone(),
797 HydroNode::FilterMap {
798 f: filter_map_f,
799 input: Box::new(self.ir_node.into_inner()),
800 metadata: self
801 .location
802 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
803 },
804 )
805 }
806
807 /// Generates a keyed stream that maps each value `v` to a tuple `(v, x)`,
808 /// where `v` is the value of `other`, a bounded [`super::singleton::Singleton`] or
809 /// [`Optional`]. If `other` is an empty [`Optional`], no values will be produced.
810 ///
811 /// # Example
812 /// ```rust
813 /// # #[cfg(feature = "deploy")] {
814 /// # use hydro_lang::prelude::*;
815 /// # use futures::StreamExt;
816 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
817 /// let tick = process.tick();
818 /// let batch = process
819 /// .source_iter(q!(vec![(1, 123), (1, 456), (2, 123)]))
820 /// .into_keyed()
821 /// .batch(&tick, nondet!(/** test */));
822 /// let count = batch.clone().entries().count(); // `count()` returns a singleton
823 /// batch.cross_singleton(count).all_ticks().entries()
824 /// # }, |mut stream| async move {
825 /// // { 1: [(123, 3), (456, 3)], 2: [(123, 3)] }
826 /// # let mut results = Vec::new();
827 /// # for _ in 0..3 {
828 /// # results.push(stream.next().await.unwrap());
829 /// # }
830 /// # results.sort();
831 /// # assert_eq!(results, vec![(1, (123, 3)), (1, (456, 3)), (2, (123, 3))]);
832 /// # }));
833 /// # }
834 /// ```
835 pub fn cross_singleton<O2>(
836 self,
837 other: impl Into<Optional<O2, L, Bounded>>,
838 ) -> KeyedStream<K, (V, O2), L, B, O, R>
839 where
840 O2: Clone,
841 {
842 let other: Optional<O2, L, Bounded> = other.into();
843 check_matching_location(&self.location, &other.location);
844
845 Stream::new(
846 self.location.clone(),
847 HydroNode::CrossSingleton {
848 left: Box::new(self.ir_node.into_inner()),
849 right: Box::new(other.ir_node.into_inner()),
850 metadata: self
851 .location
852 .new_node_metadata(Stream::<((K, V), O2), L, B, O, R>::collection_kind()),
853 },
854 )
855 .map(q!(|((k, v), o2)| (k, (v, o2))))
856 .into_keyed()
857 }
858
859 /// For each value `v` in each group, transform `v` using `f` and then treat the
860 /// result as an [`Iterator`] to produce values one by one within the same group.
861 /// The implementation for [`Iterator`] for the output type `I` must produce items
862 /// in a **deterministic** order.
863 ///
864 /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
865 /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
866 ///
867 /// # Example
868 /// ```rust
869 /// # #[cfg(feature = "deploy")] {
870 /// # use hydro_lang::prelude::*;
871 /// # use futures::StreamExt;
872 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
873 /// process
874 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
875 /// .into_keyed()
876 /// .flat_map_ordered(q!(|x| x))
877 /// # .entries()
878 /// # }, |mut stream| async move {
879 /// // { 1: [2, 3, 4], 2: [5, 6] }
880 /// # let mut results = Vec::new();
881 /// # for _ in 0..5 {
882 /// # results.push(stream.next().await.unwrap());
883 /// # }
884 /// # results.sort();
885 /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
886 /// # }));
887 /// # }
888 /// ```
889 pub fn flat_map_ordered<U, I, F>(
890 self,
891 f: impl IntoQuotedMut<'a, F, L> + Copy,
892 ) -> KeyedStream<K, U, L, B, O, R>
893 where
894 I: IntoIterator<Item = U>,
895 F: Fn(V) -> I + 'a,
896 K: Clone,
897 {
898 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
899 let flat_map_f = q!({
900 let orig = f;
901 move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
902 })
903 .splice_fn1_ctx::<(K, V), _>(&self.location)
904 .into();
905
906 KeyedStream::new(
907 self.location.clone(),
908 HydroNode::FlatMap {
909 f: flat_map_f,
910 input: Box::new(self.ir_node.into_inner()),
911 metadata: self
912 .location
913 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
914 },
915 )
916 }
917
918 /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
919 /// for the output type `I` to produce items in any order.
920 ///
921 /// # Example
922 /// ```rust
923 /// # #[cfg(feature = "deploy")] {
924 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
925 /// # use futures::StreamExt;
926 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
927 /// process
928 /// .source_iter(q!(vec![
929 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
930 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
931 /// ]))
932 /// .into_keyed()
933 /// .flat_map_unordered(q!(|x| x))
934 /// # .entries()
935 /// # }, |mut stream| async move {
936 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
937 /// # let mut results = Vec::new();
938 /// # for _ in 0..4 {
939 /// # results.push(stream.next().await.unwrap());
940 /// # }
941 /// # results.sort();
942 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
943 /// # }));
944 /// # }
945 /// ```
946 pub fn flat_map_unordered<U, I, F>(
947 self,
948 f: impl IntoQuotedMut<'a, F, L> + Copy,
949 ) -> KeyedStream<K, U, L, B, NoOrder, R>
950 where
951 I: IntoIterator<Item = U>,
952 F: Fn(V) -> I + 'a,
953 K: Clone,
954 {
955 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
956 let flat_map_f = q!({
957 let orig = f;
958 move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
959 })
960 .splice_fn1_ctx::<(K, V), _>(&self.location)
961 .into();
962
963 KeyedStream::new(
964 self.location.clone(),
965 HydroNode::FlatMap {
966 f: flat_map_f,
967 input: Box::new(self.ir_node.into_inner()),
968 metadata: self
969 .location
970 .new_node_metadata(KeyedStream::<K, U, L, B, NoOrder, R>::collection_kind()),
971 },
972 )
973 }
974
975 /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
976 /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
977 /// items in a **deterministic** order.
978 ///
979 /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
980 /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
981 ///
982 /// # Example
983 /// ```rust
984 /// # #[cfg(feature = "deploy")] {
985 /// # use hydro_lang::prelude::*;
986 /// # use futures::StreamExt;
987 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
988 /// process
989 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
990 /// .into_keyed()
991 /// .flatten_ordered()
992 /// # .entries()
993 /// # }, |mut stream| async move {
994 /// // { 1: [2, 3, 4], 2: [5, 6] }
995 /// # let mut results = Vec::new();
996 /// # for _ in 0..5 {
997 /// # results.push(stream.next().await.unwrap());
998 /// # }
999 /// # results.sort();
1000 /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1001 /// # }));
1002 /// # }
1003 /// ```
1004 pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
1005 where
1006 V: IntoIterator<Item = U>,
1007 K: Clone,
1008 {
1009 self.flat_map_ordered(q!(|d| d))
1010 }
1011
1012 /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
1013 /// for the value type `V` to produce items in any order.
1014 ///
1015 /// # Example
1016 /// ```rust
1017 /// # #[cfg(feature = "deploy")] {
1018 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1019 /// # use futures::StreamExt;
1020 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1021 /// process
1022 /// .source_iter(q!(vec![
1023 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1024 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
1025 /// ]))
1026 /// .into_keyed()
1027 /// .flatten_unordered()
1028 /// # .entries()
1029 /// # }, |mut stream| async move {
1030 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1031 /// # let mut results = Vec::new();
1032 /// # for _ in 0..4 {
1033 /// # results.push(stream.next().await.unwrap());
1034 /// # }
1035 /// # results.sort();
1036 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1037 /// # }));
1038 /// # }
1039 /// ```
1040 pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
1041 where
1042 V: IntoIterator<Item = U>,
1043 K: Clone,
1044 {
1045 self.flat_map_unordered(q!(|d| d))
1046 }
1047
1048 /// An operator which allows you to "inspect" each element of a stream without
1049 /// modifying it. The closure `f` is called on a reference to each value. This is
1050 /// mainly useful for debugging, and should not be used to generate side-effects.
1051 ///
1052 /// # Example
1053 /// ```rust
1054 /// # #[cfg(feature = "deploy")] {
1055 /// # use hydro_lang::prelude::*;
1056 /// # use futures::StreamExt;
1057 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1058 /// process
1059 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1060 /// .into_keyed()
1061 /// .inspect(q!(|v| println!("{}", v)))
1062 /// # .entries()
1063 /// # }, |mut stream| async move {
1064 /// # let mut results = Vec::new();
1065 /// # for _ in 0..3 {
1066 /// # results.push(stream.next().await.unwrap());
1067 /// # }
1068 /// # results.sort();
1069 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1070 /// # }));
1071 /// # }
1072 /// ```
1073 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1074 where
1075 F: Fn(&V) + 'a,
1076 {
1077 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1078 let inspect_f = q!({
1079 let orig = f;
1080 move |t: &(_, _)| orig(&t.1)
1081 })
1082 .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1083 .into();
1084
1085 KeyedStream::new(
1086 self.location.clone(),
1087 HydroNode::Inspect {
1088 f: inspect_f,
1089 input: Box::new(self.ir_node.into_inner()),
1090 metadata: self.location.new_node_metadata(Self::collection_kind()),
1091 },
1092 )
1093 }
1094
1095 /// An operator which allows you to "inspect" each element of a stream without
1096 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1097 /// mainly useful for debugging, and should not be used to generate side-effects.
1098 ///
1099 /// # Example
1100 /// ```rust
1101 /// # #[cfg(feature = "deploy")] {
1102 /// # use hydro_lang::prelude::*;
1103 /// # use futures::StreamExt;
1104 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1105 /// process
1106 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1107 /// .into_keyed()
1108 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1109 /// # .entries()
1110 /// # }, |mut stream| async move {
1111 /// # let mut results = Vec::new();
1112 /// # for _ in 0..3 {
1113 /// # results.push(stream.next().await.unwrap());
1114 /// # }
1115 /// # results.sort();
1116 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1117 /// # }));
1118 /// # }
1119 /// ```
1120 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1121 where
1122 F: Fn(&(K, V)) + 'a,
1123 {
1124 let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1125
1126 KeyedStream::new(
1127 self.location.clone(),
1128 HydroNode::Inspect {
1129 f: inspect_f,
1130 input: Box::new(self.ir_node.into_inner()),
1131 metadata: self.location.new_node_metadata(Self::collection_kind()),
1132 },
1133 )
1134 }
1135
1136 /// An operator which allows you to "name" a `HydroNode`.
1137 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1138 pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
1139 {
1140 let mut node = self.ir_node.borrow_mut();
1141 let metadata = node.metadata_mut();
1142 metadata.tag = Some(name.to_string());
1143 }
1144 self
1145 }
1146}
1147
1148impl<'a, K1, K2, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
1149 KeyedStream<(K1, K2), V, L, B, O, R>
1150{
1151 /// Produces a new keyed stream by dropping the first element of the compound key.
1152 ///
1153 /// Because multiple keys may share the same suffix, this operation results in re-grouping
1154 /// of the values under the new keys. The values across groups with the same new key
1155 /// will be interleaved, so the resulting stream has [`NoOrder`] within each group.
1156 ///
1157 /// # Example
1158 /// ```rust
1159 /// # #[cfg(feature = "deploy")] {
1160 /// # use hydro_lang::prelude::*;
1161 /// # use futures::StreamExt;
1162 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1163 /// process
1164 /// .source_iter(q!(vec![((1, 10), 2), ((1, 10), 3), ((2, 20), 4)]))
1165 /// .into_keyed()
1166 /// .drop_key_prefix()
1167 /// # .entries()
1168 /// # }, |mut stream| async move {
1169 /// // { 10: [2, 3], 20: [4] }
1170 /// # let mut results = Vec::new();
1171 /// # for _ in 0..3 {
1172 /// # results.push(stream.next().await.unwrap());
1173 /// # }
1174 /// # results.sort();
1175 /// # assert_eq!(results, vec![(10, 2), (10, 3), (20, 4)]);
1176 /// # }));
1177 /// # }
1178 /// ```
1179 pub fn drop_key_prefix(self) -> KeyedStream<K2, V, L, B, NoOrder, R> {
1180 self.entries()
1181 .map(q!(|((_k1, k2), v)| (k2, v)))
1182 .into_keyed()
1183 }
1184}
1185
1186impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries>
1187 KeyedStream<K, V, L, Unbounded, O, R>
1188{
1189 /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
1190 /// of any overlapping groups. The result has [`NoOrder`] on each group because the
1191 /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
1192 /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
1193 ///
1194 /// Currently, both input streams must be [`Unbounded`].
1195 ///
1196 /// # Example
1197 /// ```rust
1198 /// # #[cfg(feature = "deploy")] {
1199 /// # use hydro_lang::prelude::*;
1200 /// # use futures::StreamExt;
1201 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1202 /// let numbers1: KeyedStream<i32, i32, _> = // { 1: [2], 3: [4] }
1203 /// # process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed().into();
1204 /// let numbers2: KeyedStream<i32, i32, _> = // { 1: [3], 3: [5] }
1205 /// # process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed().into();
1206 /// numbers1.interleave(numbers2)
1207 /// # .entries()
1208 /// # }, |mut stream| async move {
1209 /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
1210 /// # let mut results = Vec::new();
1211 /// # for _ in 0..4 {
1212 /// # results.push(stream.next().await.unwrap());
1213 /// # }
1214 /// # results.sort();
1215 /// # assert_eq!(results, vec![(1, 2), (1, 3), (3, 4), (3, 5)]);
1216 /// # }));
1217 /// # }
1218 /// ```
1219 pub fn interleave<O2: Ordering, R2: Retries>(
1220 self,
1221 other: KeyedStream<K, V, L, Unbounded, O2, R2>,
1222 ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1223 where
1224 R: MinRetries<R2>,
1225 {
1226 let tick = self.location.tick();
1227 // Because the outputs are unordered, we can interleave batches from both streams.
1228 let nondet_batch_interleaving = nondet!(/** output stream is NoOrder, can interleave */);
1229 self.batch(&tick, nondet_batch_interleaving)
1230 .weakest_ordering()
1231 .chain(
1232 other
1233 .batch(&tick, nondet_batch_interleaving)
1234 .weakest_ordering(),
1235 )
1236 .all_ticks()
1237 }
1238}
1239
1240/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
1241/// control the processing of future elements.
1242pub enum Generate<T> {
1243 /// Emit the provided element, and keep processing future inputs.
1244 Yield(T),
1245 /// Emit the provided element as the _final_ element, do not process future inputs.
1246 Return(T),
1247 /// Do not emit anything, but continue processing future inputs.
1248 Continue,
1249 /// Do not emit anything, and do not process further inputs.
1250 Break,
1251}
1252
1253impl<'a, K, V, L, B: Boundedness> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
1254where
1255 L: Location<'a>,
1256{
1257 /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
1258 ///
1259 /// Unlike [`KeyedStream::fold`] which only returns the final accumulated value, `scan` produces a new stream
1260 /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
1261 /// early by returning `None`.
1262 ///
1263 /// The function takes a mutable reference to the accumulator and the current element, and returns
1264 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1265 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1266 ///
1267 /// # Example
1268 /// ```rust
1269 /// # #[cfg(feature = "deploy")] {
1270 /// # use hydro_lang::prelude::*;
1271 /// # use futures::StreamExt;
1272 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1273 /// process
1274 /// .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
1275 /// .into_keyed()
1276 /// .scan(
1277 /// q!(|| 0),
1278 /// q!(|acc, x| {
1279 /// *acc += x;
1280 /// if *acc % 2 == 0 { None } else { Some(*acc) }
1281 /// }),
1282 /// )
1283 /// # .entries()
1284 /// # }, |mut stream| async move {
1285 /// // Output: { 0: [1], 1: [3, 7] }
1286 /// # let mut results = Vec::new();
1287 /// # for _ in 0..3 {
1288 /// # results.push(stream.next().await.unwrap());
1289 /// # }
1290 /// # results.sort();
1291 /// # assert_eq!(results, vec![(0, 1), (1, 3), (1, 7)]);
1292 /// # }));
1293 /// # }
1294 /// ```
1295 pub fn scan<A, U, I, F>(
1296 self,
1297 init: impl IntoQuotedMut<'a, I, L> + Copy,
1298 f: impl IntoQuotedMut<'a, F, L> + Copy,
1299 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1300 where
1301 K: Clone + Eq + Hash,
1302 I: Fn() -> A + 'a,
1303 F: Fn(&mut A, V) -> Option<U> + 'a,
1304 {
1305 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1306 self.generator(
1307 init,
1308 q!({
1309 let orig = f;
1310 move |state, v| {
1311 if let Some(out) = orig(state, v) {
1312 Generate::Yield(out)
1313 } else {
1314 Generate::Break
1315 }
1316 }
1317 }),
1318 )
1319 }
1320
1321 /// Iteratively processes the elements in each group using a state machine that can yield
1322 /// elements as it processes its inputs. This is designed to mirror the unstable generator
1323 /// syntax in Rust, without requiring special syntax.
1324 ///
1325 /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
1326 /// state for each group. The second argument defines the processing logic, taking in a
1327 /// mutable reference to the group's state and the value to be processed. It emits a
1328 /// [`Generate`] value, whose variants define what is emitted and whether further inputs
1329 /// should be processed.
1330 ///
1331 /// # Example
1332 /// ```rust
1333 /// # #[cfg(feature = "deploy")] {
1334 /// # use hydro_lang::prelude::*;
1335 /// # use futures::StreamExt;
1336 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1337 /// process
1338 /// .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
1339 /// .into_keyed()
1340 /// .generator(
1341 /// q!(|| 0),
1342 /// q!(|acc, x| {
1343 /// *acc += x;
1344 /// if *acc > 100 {
1345 /// hydro_lang::live_collections::keyed_stream::Generate::Return(
1346 /// "done!".to_string()
1347 /// )
1348 /// } else if *acc % 2 == 0 {
1349 /// hydro_lang::live_collections::keyed_stream::Generate::Yield(
1350 /// "even".to_string()
1351 /// )
1352 /// } else {
1353 /// hydro_lang::live_collections::keyed_stream::Generate::Continue
1354 /// }
1355 /// }),
1356 /// )
1357 /// # .entries()
1358 /// # }, |mut stream| async move {
1359 /// // Output: { 0: ["even", "done!"], 1: ["even"] }
1360 /// # let mut results = Vec::new();
1361 /// # for _ in 0..3 {
1362 /// # results.push(stream.next().await.unwrap());
1363 /// # }
1364 /// # results.sort();
1365 /// # assert_eq!(results, vec![(0, "done!".to_string()), (0, "even".to_string()), (1, "even".to_string())]);
1366 /// # }));
1367 /// # }
1368 /// ```
1369 pub fn generator<A, U, I, F>(
1370 self,
1371 init: impl IntoQuotedMut<'a, I, L> + Copy,
1372 f: impl IntoQuotedMut<'a, F, L> + Copy,
1373 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1374 where
1375 K: Clone + Eq + Hash,
1376 I: Fn() -> A + 'a,
1377 F: Fn(&mut A, V) -> Generate<U> + 'a,
1378 {
1379 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1380 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1381
1382 let scan_init = q!(|| HashMap::new())
1383 .splice_fn0_ctx::<HashMap<K, Option<A>>>(&self.location)
1384 .into();
1385 let scan_f = q!(move |acc: &mut HashMap<_, _>, (k, v)| {
1386 let existing_state = acc.entry(Clone::clone(&k)).or_insert_with(|| Some(init()));
1387 if let Some(existing_state_value) = existing_state {
1388 match f(existing_state_value, v) {
1389 Generate::Yield(out) => Some(Some((k, out))),
1390 Generate::Return(out) => {
1391 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1392 Some(Some((k, out)))
1393 }
1394 Generate::Break => {
1395 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1396 Some(None)
1397 }
1398 Generate::Continue => Some(None),
1399 }
1400 } else {
1401 Some(None)
1402 }
1403 })
1404 .splice_fn2_borrow_mut_ctx::<HashMap<K, Option<A>>, (K, V), _>(&self.location)
1405 .into();
1406
1407 let scan_node = HydroNode::Scan {
1408 init: scan_init,
1409 acc: scan_f,
1410 input: Box::new(self.ir_node.into_inner()),
1411 metadata: self.location.new_node_metadata(Stream::<
1412 Option<(K, U)>,
1413 L,
1414 B,
1415 TotalOrder,
1416 ExactlyOnce,
1417 >::collection_kind()),
1418 };
1419
1420 let flatten_f = q!(|d| d)
1421 .splice_fn1_ctx::<Option<(K, U)>, _>(&self.location)
1422 .into();
1423 let flatten_node = HydroNode::FlatMap {
1424 f: flatten_f,
1425 input: Box::new(scan_node),
1426 metadata: self.location.new_node_metadata(KeyedStream::<
1427 K,
1428 U,
1429 L,
1430 B,
1431 TotalOrder,
1432 ExactlyOnce,
1433 >::collection_kind()),
1434 };
1435
1436 KeyedStream::new(self.location.clone(), flatten_node)
1437 }
1438
1439 /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
1440 /// in-order across the values in each group. But the aggregation function returns a boolean,
1441 /// which when true indicates that the aggregated result is complete and can be released to
1442 /// downstream computation. Unlike [`KeyedStream::fold`], this means that even if the input
1443 /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
1444 /// normal stream elements.
1445 ///
1446 /// # Example
1447 /// ```rust
1448 /// # #[cfg(feature = "deploy")] {
1449 /// # use hydro_lang::prelude::*;
1450 /// # use futures::StreamExt;
1451 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1452 /// process
1453 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1454 /// .into_keyed()
1455 /// .fold_early_stop(
1456 /// q!(|| 0),
1457 /// q!(|acc, x| {
1458 /// *acc += x;
1459 /// x % 2 == 0
1460 /// }),
1461 /// )
1462 /// # .entries()
1463 /// # }, |mut stream| async move {
1464 /// // Output: { 0: 2, 1: 9 }
1465 /// # let mut results = Vec::new();
1466 /// # for _ in 0..2 {
1467 /// # results.push(stream.next().await.unwrap());
1468 /// # }
1469 /// # results.sort();
1470 /// # assert_eq!(results, vec![(0, 2), (1, 9)]);
1471 /// # }));
1472 /// # }
1473 /// ```
1474 pub fn fold_early_stop<A, I, F>(
1475 self,
1476 init: impl IntoQuotedMut<'a, I, L> + Copy,
1477 f: impl IntoQuotedMut<'a, F, L> + Copy,
1478 ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
1479 where
1480 K: Clone + Eq + Hash,
1481 I: Fn() -> A + 'a,
1482 F: Fn(&mut A, V) -> bool + 'a,
1483 {
1484 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1485 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1486 let out_without_bound_cast = self.generator(
1487 q!(move || Some(init())),
1488 q!(move |key_state, v| {
1489 if let Some(key_state_value) = key_state.as_mut() {
1490 if f(key_state_value, v) {
1491 Generate::Return(key_state.take().unwrap())
1492 } else {
1493 Generate::Continue
1494 }
1495 } else {
1496 unreachable!()
1497 }
1498 }),
1499 );
1500
1501 KeyedSingleton::new(
1502 out_without_bound_cast.location.clone(),
1503 HydroNode::Cast {
1504 inner: Box::new(out_without_bound_cast.ir_node.into_inner()),
1505 metadata: out_without_bound_cast
1506 .location
1507 .new_node_metadata(
1508 KeyedSingleton::<K, A, L, B::WhenValueBounded>::collection_kind(),
1509 ),
1510 },
1511 )
1512 }
1513
1514 /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
1515 /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
1516 /// otherwise the first element would be non-deterministic.
1517 ///
1518 /// # Example
1519 /// ```rust
1520 /// # #[cfg(feature = "deploy")] {
1521 /// # use hydro_lang::prelude::*;
1522 /// # use futures::StreamExt;
1523 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1524 /// process
1525 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1526 /// .into_keyed()
1527 /// .first()
1528 /// # .entries()
1529 /// # }, |mut stream| async move {
1530 /// // Output: { 0: 2, 1: 3 }
1531 /// # let mut results = Vec::new();
1532 /// # for _ in 0..2 {
1533 /// # results.push(stream.next().await.unwrap());
1534 /// # }
1535 /// # results.sort();
1536 /// # assert_eq!(results, vec![(0, 2), (1, 3)]);
1537 /// # }));
1538 /// # }
1539 /// ```
1540 pub fn first(self) -> KeyedSingleton<K, V, L, B::WhenValueBounded>
1541 where
1542 K: Clone + Eq + Hash,
1543 {
1544 self.fold_early_stop(
1545 q!(|| None),
1546 q!(|acc, v| {
1547 *acc = Some(v);
1548 true
1549 }),
1550 )
1551 .map(q!(|v| v.unwrap()))
1552 }
1553}
1554
1555impl<'a, K, V, L, B: Boundedness, O: Ordering> KeyedStream<K, V, L, B, O, ExactlyOnce>
1556where
1557 L: Location<'a>,
1558{
1559 /// Counts the number of elements in each group, producing a [`KeyedSingleton`] with the counts.
1560 ///
1561 /// # Example
1562 /// ```rust
1563 /// # #[cfg(feature = "deploy")] {
1564 /// # use hydro_lang::prelude::*;
1565 /// # use futures::StreamExt;
1566 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1567 /// let tick = process.tick();
1568 /// let numbers = process
1569 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4), (1, 5)]))
1570 /// .into_keyed();
1571 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1572 /// batch
1573 /// .value_counts()
1574 /// .entries()
1575 /// .all_ticks()
1576 /// # }, |mut stream| async move {
1577 /// // (1, 3), (2, 2)
1578 /// # let mut results = Vec::new();
1579 /// # for _ in 0..2 {
1580 /// # results.push(stream.next().await.unwrap());
1581 /// # }
1582 /// # results.sort();
1583 /// # assert_eq!(results, vec![(1, 3), (2, 2)]);
1584 /// # }));
1585 /// # }
1586 /// ```
1587 pub fn value_counts(self) -> KeyedSingleton<K, usize, L, B::WhenValueUnbounded>
1588 where
1589 K: Eq + Hash,
1590 {
1591 self.assume_ordering_trusted(
1592 nondet!(/** ordering within each group affects neither result nor intermediates */),
1593 )
1594 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1595 }
1596}
1597
1598impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1599where
1600 L: Location<'a>,
1601{
1602 /// Like [`Stream::fold`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1603 /// group via the `comb` closure.
1604 ///
1605 /// Depending on the input stream guarantees, the closure may need to be commutative
1606 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1607 ///
1608 /// If the input and output value types are the same and do not require initialization then use
1609 /// [`KeyedStream::reduce`].
1610 ///
1611 /// # Example
1612 /// ```rust
1613 /// # #[cfg(feature = "deploy")] {
1614 /// # use hydro_lang::prelude::*;
1615 /// # use futures::StreamExt;
1616 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1617 /// let tick = process.tick();
1618 /// let numbers = process
1619 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1620 /// .into_keyed();
1621 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1622 /// batch
1623 /// .fold(q!(|| false), q!(|acc, x| *acc |= x))
1624 /// .entries()
1625 /// .all_ticks()
1626 /// # }, |mut stream| async move {
1627 /// // (1, false), (2, true)
1628 /// # let mut results = Vec::new();
1629 /// # for _ in 0..2 {
1630 /// # results.push(stream.next().await.unwrap());
1631 /// # }
1632 /// # results.sort();
1633 /// # assert_eq!(results, vec![(1, false), (2, true)]);
1634 /// # }));
1635 /// # }
1636 /// ```
1637 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V), C, Idemp>(
1638 self,
1639 init: impl IntoQuotedMut<'a, I, L>,
1640 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1641 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
1642 where
1643 K: Eq + Hash,
1644 C: ValidCommutativityFor<O>,
1645 Idemp: ValidIdempotenceFor<R>,
1646 {
1647 let init = init.splice_fn0_ctx(&self.location).into();
1648 let comb = comb
1649 .splice_fn2_borrow_mut_ctx_props(&self.location)
1650 .0
1651 .into();
1652
1653 let ordered = self
1654 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1655 .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1656
1657 KeyedSingleton::new(
1658 ordered.location.clone(),
1659 HydroNode::FoldKeyed {
1660 init,
1661 acc: comb,
1662 input: Box::new(ordered.ir_node.into_inner()),
1663 metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1664 K,
1665 A,
1666 L,
1667 B::WhenValueUnbounded,
1668 >::collection_kind()),
1669 },
1670 )
1671 }
1672
1673 /// Like [`Stream::reduce`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1674 /// group via the `comb` closure.
1675 ///
1676 /// Depending on the input stream guarantees, the closure may need to be commutative
1677 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1678 ///
1679 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1680 ///
1681 /// # Example
1682 /// ```rust
1683 /// # #[cfg(feature = "deploy")] {
1684 /// # use hydro_lang::prelude::*;
1685 /// # use futures::StreamExt;
1686 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1687 /// let tick = process.tick();
1688 /// let numbers = process
1689 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1690 /// .into_keyed();
1691 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1692 /// batch
1693 /// .reduce(q!(|acc, x| *acc |= x))
1694 /// .entries()
1695 /// .all_ticks()
1696 /// # }, |mut stream| async move {
1697 /// // (1, false), (2, true)
1698 /// # let mut results = Vec::new();
1699 /// # for _ in 0..2 {
1700 /// # results.push(stream.next().await.unwrap());
1701 /// # }
1702 /// # results.sort();
1703 /// # assert_eq!(results, vec![(1, false), (2, true)]);
1704 /// # }));
1705 /// # }
1706 /// ```
1707 pub fn reduce<F: Fn(&mut V, V) + 'a, C, Idemp>(
1708 self,
1709 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1710 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1711 where
1712 K: Eq + Hash,
1713 C: ValidCommutativityFor<O>,
1714 Idemp: ValidIdempotenceFor<R>,
1715 {
1716 let f = comb
1717 .splice_fn2_borrow_mut_ctx_props(&self.location)
1718 .0
1719 .into();
1720
1721 let ordered = self
1722 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1723 .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1724
1725 KeyedSingleton::new(
1726 ordered.location.clone(),
1727 HydroNode::ReduceKeyed {
1728 f,
1729 input: Box::new(ordered.ir_node.into_inner()),
1730 metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1731 K,
1732 V,
1733 L,
1734 B::WhenValueUnbounded,
1735 >::collection_kind()),
1736 },
1737 )
1738 }
1739
1740 /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark
1741 /// are automatically deleted.
1742 ///
1743 /// Depending on the input stream guarantees, the closure may need to be commutative
1744 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1745 ///
1746 /// # Example
1747 /// ```rust
1748 /// # #[cfg(feature = "deploy")] {
1749 /// # use hydro_lang::prelude::*;
1750 /// # use futures::StreamExt;
1751 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1752 /// let tick = process.tick();
1753 /// let watermark = tick.singleton(q!(1));
1754 /// let numbers = process
1755 /// .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1756 /// .into_keyed();
1757 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1758 /// batch
1759 /// .reduce_watermark(watermark, q!(|acc, x| *acc |= x))
1760 /// .entries()
1761 /// .all_ticks()
1762 /// # }, |mut stream| async move {
1763 /// // (2, true)
1764 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1765 /// # }));
1766 /// # }
1767 /// ```
1768 pub fn reduce_watermark<O2, F, C, Idemp>(
1769 self,
1770 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1771 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1772 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1773 where
1774 K: Eq + Hash,
1775 O2: Clone,
1776 F: Fn(&mut V, V) + 'a,
1777 C: ValidCommutativityFor<O>,
1778 Idemp: ValidIdempotenceFor<R>,
1779 {
1780 let other: Optional<O2, Tick<L::Root>, Bounded> = other.into();
1781 check_matching_location(&self.location.root(), other.location.outer());
1782 let f = comb
1783 .splice_fn2_borrow_mut_ctx_props(&self.location)
1784 .0
1785 .into();
1786
1787 let ordered = self
1788 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1789 .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1790
1791 KeyedSingleton::new(
1792 ordered.location.clone(),
1793 HydroNode::ReduceKeyedWatermark {
1794 f,
1795 input: Box::new(ordered.ir_node.into_inner()),
1796 watermark: Box::new(other.ir_node.into_inner()),
1797 metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1798 K,
1799 V,
1800 L,
1801 B::WhenValueUnbounded,
1802 >::collection_kind()),
1803 },
1804 )
1805 }
1806
1807 /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1808 /// whose keys are not in the bounded stream.
1809 ///
1810 /// # Example
1811 /// ```rust
1812 /// # #[cfg(feature = "deploy")] {
1813 /// # use hydro_lang::prelude::*;
1814 /// # use futures::StreamExt;
1815 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1816 /// let tick = process.tick();
1817 /// let keyed_stream = process
1818 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1819 /// .batch(&tick, nondet!(/** test */))
1820 /// .into_keyed();
1821 /// let keys_to_remove = process
1822 /// .source_iter(q!(vec![1, 2]))
1823 /// .batch(&tick, nondet!(/** test */));
1824 /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1825 /// # .entries()
1826 /// # }, |mut stream| async move {
1827 /// // { 3: ['c'], 4: ['d'] }
1828 /// # let mut results = Vec::new();
1829 /// # for _ in 0..2 {
1830 /// # results.push(stream.next().await.unwrap());
1831 /// # }
1832 /// # results.sort();
1833 /// # assert_eq!(results, vec![(3, 'c'), (4, 'd')]);
1834 /// # }));
1835 /// # }
1836 /// ```
1837 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1838 self,
1839 other: Stream<K, L, Bounded, O2, R2>,
1840 ) -> Self
1841 where
1842 K: Eq + Hash,
1843 {
1844 check_matching_location(&self.location, &other.location);
1845
1846 KeyedStream::new(
1847 self.location.clone(),
1848 HydroNode::AntiJoin {
1849 pos: Box::new(self.ir_node.into_inner()),
1850 neg: Box::new(other.ir_node.into_inner()),
1851 metadata: self.location.new_node_metadata(Self::collection_kind()),
1852 },
1853 )
1854 }
1855}
1856
1857impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1858where
1859 L: Location<'a>,
1860{
1861 /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
1862 /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
1863 ///
1864 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1865 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1866 /// argument that declares where the stream will be atomically processed. Batching a stream into
1867 /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
1868 /// [`Tick`] will introduce asynchrony.
1869 pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
1870 let out_location = Atomic { tick: tick.clone() };
1871 KeyedStream::new(
1872 out_location.clone(),
1873 HydroNode::BeginAtomic {
1874 inner: Box::new(self.ir_node.into_inner()),
1875 metadata: out_location
1876 .new_node_metadata(KeyedStream::<K, V, Atomic<L>, B, O, R>::collection_kind()),
1877 },
1878 )
1879 }
1880
1881 /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
1882 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1883 /// the order of the input.
1884 ///
1885 /// # Non-Determinism
1886 /// The batch boundaries are non-deterministic and may change across executions.
1887 pub fn batch(
1888 self,
1889 tick: &Tick<L>,
1890 nondet: NonDet,
1891 ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1892 let _ = nondet;
1893 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1894 KeyedStream::new(
1895 tick.clone(),
1896 HydroNode::Batch {
1897 inner: Box::new(self.ir_node.into_inner()),
1898 metadata: tick.new_node_metadata(
1899 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
1900 ),
1901 },
1902 )
1903 }
1904}
1905
1906impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
1907where
1908 L: Location<'a> + NoTick,
1909{
1910 /// Returns a keyed stream corresponding to the latest batch of elements being atomically
1911 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
1912 /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
1913 /// used to create the atomic section.
1914 ///
1915 /// # Non-Determinism
1916 /// The batch boundaries are non-deterministic and may change across executions.
1917 pub fn batch_atomic(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1918 let _ = nondet;
1919 KeyedStream::new(
1920 self.location.clone().tick,
1921 HydroNode::Batch {
1922 inner: Box::new(self.ir_node.into_inner()),
1923 metadata: self.location.tick.new_node_metadata(KeyedStream::<
1924 K,
1925 V,
1926 Tick<L>,
1927 Bounded,
1928 O,
1929 R,
1930 >::collection_kind(
1931 )),
1932 },
1933 )
1934 }
1935
1936 /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
1937 /// See [`KeyedStream::atomic`] for more details.
1938 pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
1939 KeyedStream::new(
1940 self.location.tick.l.clone(),
1941 HydroNode::EndAtomic {
1942 inner: Box::new(self.ir_node.into_inner()),
1943 metadata: self
1944 .location
1945 .tick
1946 .l
1947 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
1948 },
1949 )
1950 }
1951}
1952
1953impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, L, Bounded, O, R>
1954where
1955 L: Location<'a>,
1956{
1957 /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
1958 /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
1959 /// is only present in one of the inputs, its values are passed through as-is). The output has
1960 /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
1961 ///
1962 /// Currently, both input streams must be [`Bounded`]. This operator will block
1963 /// on the first stream until all its elements are available. In a future version,
1964 /// we will relax the requirement on the `other` stream.
1965 ///
1966 /// # Example
1967 /// ```rust
1968 /// # #[cfg(feature = "deploy")] {
1969 /// # use hydro_lang::prelude::*;
1970 /// # use futures::StreamExt;
1971 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1972 /// let tick = process.tick();
1973 /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
1974 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1975 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1976 /// # .entries()
1977 /// # }, |mut stream| async move {
1978 /// // { 0: [2, 1], 1: [4, 3] }
1979 /// # let mut results = Vec::new();
1980 /// # for _ in 0..4 {
1981 /// # results.push(stream.next().await.unwrap());
1982 /// # }
1983 /// # results.sort();
1984 /// # assert_eq!(results, vec![(0, 1), (0, 2), (1, 3), (1, 4)]);
1985 /// # }));
1986 /// # }
1987 /// ```
1988 pub fn chain<O2: Ordering, R2: Retries>(
1989 self,
1990 other: KeyedStream<K, V, L, Bounded, O2, R2>,
1991 ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1992 where
1993 O: MinOrder<O2>,
1994 R: MinRetries<R2>,
1995 {
1996 check_matching_location(&self.location, &other.location);
1997
1998 KeyedStream::new(
1999 self.location.clone(),
2000 HydroNode::Chain {
2001 first: Box::new(self.ir_node.into_inner()),
2002 second: Box::new(other.ir_node.into_inner()),
2003 metadata: self.location.new_node_metadata(KeyedStream::<
2004 K,
2005 V,
2006 L,
2007 Bounded,
2008 <O as MinOrder<O2>>::Min,
2009 <R as MinRetries<R2>>::Min,
2010 >::collection_kind()),
2011 },
2012 )
2013 }
2014}
2015
2016impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
2017where
2018 L: Location<'a>,
2019{
2020 /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2021 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2022 /// each key.
2023 pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
2024 KeyedStream::new(
2025 self.location.outer().clone(),
2026 HydroNode::YieldConcat {
2027 inner: Box::new(self.ir_node.into_inner()),
2028 metadata: self.location.outer().new_node_metadata(KeyedStream::<
2029 K,
2030 V,
2031 L,
2032 Unbounded,
2033 O,
2034 R,
2035 >::collection_kind(
2036 )),
2037 },
2038 )
2039 }
2040
2041 /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2042 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2043 /// each key.
2044 ///
2045 /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
2046 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2047 /// stream's [`Tick`] context.
2048 pub fn all_ticks_atomic(self) -> KeyedStream<K, V, Atomic<L>, Unbounded, O, R> {
2049 let out_location = Atomic {
2050 tick: self.location.clone(),
2051 };
2052
2053 KeyedStream::new(
2054 out_location.clone(),
2055 HydroNode::YieldConcat {
2056 inner: Box::new(self.ir_node.into_inner()),
2057 metadata: out_location.new_node_metadata(KeyedStream::<
2058 K,
2059 V,
2060 Atomic<L>,
2061 Unbounded,
2062 O,
2063 R,
2064 >::collection_kind()),
2065 },
2066 )
2067 }
2068
2069 /// Transforms the keyed stream using the given closure in "stateful" mode, where stateful operators
2070 /// such as `fold` retrain their memory for each key across ticks rather than resetting across batches of each key.
2071 ///
2072 /// This API is particularly useful for stateful computation on batches of data, such as
2073 /// maintaining an accumulated state that is up to date with the current batch.
2074 ///
2075 /// # Example
2076 /// ```rust
2077 /// # #[cfg(feature = "deploy")] {
2078 /// # use hydro_lang::prelude::*;
2079 /// # use futures::StreamExt;
2080 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2081 /// let tick = process.tick();
2082 /// # // ticks are lazy by default, forces the second tick to run
2083 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2084 /// # let batch_first_tick = process
2085 /// # .source_iter(q!(vec![(0, 1), (1, 2), (2, 3), (3, 4)]))
2086 /// # .into_keyed()
2087 /// # .batch(&tick, nondet!(/** test */));
2088 /// # let batch_second_tick = process
2089 /// # .source_iter(q!(vec![(0, 5), (1, 6), (2, 7)]))
2090 /// # .into_keyed()
2091 /// # .batch(&tick, nondet!(/** test */))
2092 /// # .defer_tick(); // appears on the second tick
2093 /// let input = batch_first_tick.chain(batch_second_tick).all_ticks();
2094 ///
2095 /// input.batch(&tick, nondet!(/** test */))
2096 /// .across_ticks(|s| s.reduce(q!(|sum, new| {
2097 /// *sum += new;
2098 /// }))).entries().all_ticks()
2099 /// # }, |mut stream| async move {
2100 /// // First tick: [(0, 1), (1, 2), (2, 3), (3, 4)]
2101 /// # let mut results = Vec::new();
2102 /// # for _ in 0..4 {
2103 /// # results.push(stream.next().await.unwrap());
2104 /// # }
2105 /// # results.sort();
2106 /// # assert_eq!(results, vec![(0, 1), (1, 2), (2, 3), (3, 4)]);
2107 /// // Second tick: [(0, 6), (1, 8), (2, 10), (3, 4)]
2108 /// # results.clear();
2109 /// # for _ in 0..4 {
2110 /// # results.push(stream.next().await.unwrap());
2111 /// # }
2112 /// # results.sort();
2113 /// # assert_eq!(results, vec![(0, 6), (1, 8), (2, 10), (3, 4)]);
2114 /// # }));
2115 /// # }
2116 /// ```
2117 pub fn across_ticks<Out: BatchAtomic>(
2118 self,
2119 thunk: impl FnOnce(KeyedStream<K, V, Atomic<L>, Unbounded, O, R>) -> Out,
2120 ) -> Out::Batched {
2121 thunk(self.all_ticks_atomic()).batched_atomic()
2122 }
2123
2124 /// Shifts the entries in `self` to the **next tick**, so that the returned keyed stream at
2125 /// tick `T` always has the entries of `self` at tick `T - 1`.
2126 ///
2127 /// At tick `0`, the output keyed stream is empty, since there is no previous tick.
2128 ///
2129 /// This operator enables stateful iterative processing with ticks, by sending data from one
2130 /// tick to the next. For example, you can use it to combine inputs across consecutive batches.
2131 ///
2132 /// # Example
2133 /// ```rust
2134 /// # #[cfg(feature = "deploy")] {
2135 /// # use hydro_lang::prelude::*;
2136 /// # use futures::StreamExt;
2137 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2138 /// let tick = process.tick();
2139 /// # // ticks are lazy by default, forces the second tick to run
2140 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2141 /// # let batch_first_tick = process
2142 /// # .source_iter(q!(vec![(1, 2), (1, 3)]))
2143 /// # .batch(&tick, nondet!(/** test */))
2144 /// # .into_keyed();
2145 /// # let batch_second_tick = process
2146 /// # .source_iter(q!(vec![(1, 4), (2, 5)]))
2147 /// # .batch(&tick, nondet!(/** test */))
2148 /// # .defer_tick()
2149 /// # .into_keyed(); // appears on the second tick
2150 /// let changes_across_ticks = // { 1: [2, 3] } (first tick), { 1: [4], 2: [5] } (second tick)
2151 /// # batch_first_tick.chain(batch_second_tick);
2152 /// changes_across_ticks.clone().defer_tick().chain( // from the previous tick
2153 /// changes_across_ticks // from the current tick
2154 /// )
2155 /// # .entries().all_ticks()
2156 /// # }, |mut stream| async move {
2157 /// // First tick: { 1: [2, 3] }
2158 /// # let mut results = Vec::new();
2159 /// # for _ in 0..2 {
2160 /// # results.push(stream.next().await.unwrap());
2161 /// # }
2162 /// # results.sort();
2163 /// # assert_eq!(results, vec![(1, 2), (1, 3)]);
2164 /// // Second tick: { 1: [2, 3, 4], 2: [5] }
2165 /// # results.clear();
2166 /// # for _ in 0..4 {
2167 /// # results.push(stream.next().await.unwrap());
2168 /// # }
2169 /// # results.sort();
2170 /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5)]);
2171 /// // Third tick: { 1: [4], 2: [5] }
2172 /// # results.clear();
2173 /// # for _ in 0..2 {
2174 /// # results.push(stream.next().await.unwrap());
2175 /// # }
2176 /// # results.sort();
2177 /// # assert_eq!(results, vec![(1, 4), (2, 5)]);
2178 /// # }));
2179 /// # }
2180 /// ```
2181 pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2182 KeyedStream::new(
2183 self.location.clone(),
2184 HydroNode::DeferTick {
2185 input: Box::new(self.ir_node.into_inner()),
2186 metadata: self.location.new_node_metadata(KeyedStream::<
2187 K,
2188 V,
2189 Tick<L>,
2190 Bounded,
2191 O,
2192 R,
2193 >::collection_kind()),
2194 },
2195 )
2196 }
2197}
2198
2199#[cfg(test)]
2200mod tests {
2201 #[cfg(feature = "deploy")]
2202 use futures::{SinkExt, StreamExt};
2203 #[cfg(feature = "deploy")]
2204 use hydro_deploy::Deployment;
2205 #[cfg(any(feature = "deploy", feature = "sim"))]
2206 use stageleft::q;
2207
2208 #[cfg(any(feature = "deploy", feature = "sim"))]
2209 use crate::compile::builder::FlowBuilder;
2210 #[cfg(feature = "deploy")]
2211 use crate::live_collections::stream::ExactlyOnce;
2212 #[cfg(feature = "sim")]
2213 use crate::live_collections::stream::{NoOrder, TotalOrder};
2214 #[cfg(any(feature = "deploy", feature = "sim"))]
2215 use crate::location::Location;
2216 #[cfg(any(feature = "deploy", feature = "sim"))]
2217 use crate::nondet::nondet;
2218 #[cfg(feature = "deploy")]
2219 use crate::properties::ManualProof;
2220
2221 #[cfg(feature = "deploy")]
2222 #[tokio::test]
2223 async fn reduce_watermark_filter() {
2224 let mut deployment = Deployment::new();
2225
2226 let flow = FlowBuilder::new();
2227 let node = flow.process::<()>();
2228 let external = flow.external::<()>();
2229
2230 let node_tick = node.tick();
2231 let watermark = node_tick.singleton(q!(1));
2232
2233 let sum = node
2234 .source_stream(q!(tokio_stream::iter([
2235 (0, 100),
2236 (1, 101),
2237 (2, 102),
2238 (2, 102)
2239 ])))
2240 .into_keyed()
2241 .reduce_watermark(
2242 watermark,
2243 q!(|acc, v| {
2244 *acc += v;
2245 }),
2246 )
2247 .snapshot(&node_tick, nondet!(/** test */))
2248 .entries()
2249 .all_ticks()
2250 .send_bincode_external(&external);
2251
2252 let nodes = flow
2253 .with_process(&node, deployment.Localhost())
2254 .with_external(&external, deployment.Localhost())
2255 .deploy(&mut deployment);
2256
2257 deployment.deploy().await.unwrap();
2258
2259 let mut out = nodes.connect(sum).await;
2260
2261 deployment.start().await.unwrap();
2262
2263 assert_eq!(out.next().await.unwrap(), (2, 204));
2264 }
2265
2266 #[cfg(feature = "deploy")]
2267 #[tokio::test]
2268 async fn reduce_watermark_bounded() {
2269 let mut deployment = Deployment::new();
2270
2271 let flow = FlowBuilder::new();
2272 let node = flow.process::<()>();
2273 let external = flow.external::<()>();
2274
2275 let node_tick = node.tick();
2276 let watermark = node_tick.singleton(q!(1));
2277
2278 let sum = node
2279 .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2280 .into_keyed()
2281 .reduce_watermark(
2282 watermark,
2283 q!(|acc, v| {
2284 *acc += v;
2285 }),
2286 )
2287 .entries()
2288 .send_bincode_external(&external);
2289
2290 let nodes = flow
2291 .with_process(&node, deployment.Localhost())
2292 .with_external(&external, deployment.Localhost())
2293 .deploy(&mut deployment);
2294
2295 deployment.deploy().await.unwrap();
2296
2297 let mut out = nodes.connect(sum).await;
2298
2299 deployment.start().await.unwrap();
2300
2301 assert_eq!(out.next().await.unwrap(), (2, 204));
2302 }
2303
2304 #[cfg(feature = "deploy")]
2305 #[tokio::test]
2306 async fn reduce_watermark_garbage_collect() {
2307 let mut deployment = Deployment::new();
2308
2309 let flow = FlowBuilder::new();
2310 let node = flow.process::<()>();
2311 let external = flow.external::<()>();
2312 let (tick_send, tick_trigger) =
2313 node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
2314
2315 let node_tick = node.tick();
2316 let (watermark_complete_cycle, watermark) =
2317 node_tick.cycle_with_initial(node_tick.singleton(q!(1)));
2318 let next_watermark = watermark.clone().map(q!(|v| v + 1));
2319 watermark_complete_cycle.complete_next_tick(next_watermark);
2320
2321 let tick_triggered_input = node_tick
2322 .singleton(q!((3, 103)))
2323 .into_stream()
2324 .filter_if_some(
2325 tick_trigger
2326 .clone()
2327 .batch(&node_tick, nondet!(/** test */))
2328 .first(),
2329 )
2330 .all_ticks();
2331
2332 let sum = node
2333 .source_stream(q!(tokio_stream::iter([
2334 (0, 100),
2335 (1, 101),
2336 (2, 102),
2337 (2, 102)
2338 ])))
2339 .interleave(tick_triggered_input)
2340 .into_keyed()
2341 .reduce_watermark(
2342 watermark,
2343 q!(
2344 |acc, v| {
2345 *acc += v;
2346 },
2347 commutative = ManualProof(/* integer addition is commutative */)
2348 ),
2349 )
2350 .snapshot(&node_tick, nondet!(/** test */))
2351 .entries()
2352 .all_ticks()
2353 .send_bincode_external(&external);
2354
2355 let nodes = flow
2356 .with_default_optimize()
2357 .with_process(&node, deployment.Localhost())
2358 .with_external(&external, deployment.Localhost())
2359 .deploy(&mut deployment);
2360
2361 deployment.deploy().await.unwrap();
2362
2363 let mut tick_send = nodes.connect(tick_send).await;
2364 let mut out_recv = nodes.connect(sum).await;
2365
2366 deployment.start().await.unwrap();
2367
2368 assert_eq!(out_recv.next().await.unwrap(), (2, 204));
2369
2370 tick_send.send(()).await.unwrap();
2371
2372 assert_eq!(out_recv.next().await.unwrap(), (3, 103));
2373 }
2374
2375 #[cfg(feature = "sim")]
2376 #[test]
2377 #[should_panic]
2378 fn sim_batch_nondet_size() {
2379 let flow = FlowBuilder::new();
2380 let node = flow.process::<()>();
2381
2382 let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2383
2384 let tick = node.tick();
2385 let out_recv = input
2386 .batch(&tick, nondet!(/** test */))
2387 .fold(q!(|| vec![]), q!(|acc, v| acc.push(v)))
2388 .entries()
2389 .all_ticks()
2390 .sim_output();
2391
2392 flow.sim().exhaustive(async || {
2393 out_recv
2394 .assert_yields_only_unordered([(1, vec![1, 2])])
2395 .await;
2396 });
2397 }
2398
2399 #[cfg(feature = "sim")]
2400 #[test]
2401 fn sim_batch_preserves_group_order() {
2402 let flow = FlowBuilder::new();
2403 let node = flow.process::<()>();
2404
2405 let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2406
2407 let tick = node.tick();
2408 let out_recv = input
2409 .batch(&tick, nondet!(/** test */))
2410 .all_ticks()
2411 .fold_early_stop(
2412 q!(|| 0),
2413 q!(|acc, v| {
2414 *acc = std::cmp::max(v, *acc);
2415 *acc >= 2
2416 }),
2417 )
2418 .entries()
2419 .sim_output();
2420
2421 let instances = flow.sim().exhaustive(async || {
2422 out_recv
2423 .assert_yields_only_unordered([(1, 2), (2, 3)])
2424 .await;
2425 });
2426
2427 assert_eq!(instances, 8);
2428 // - three cases: all three in a separate tick (pick where (2, 3) is)
2429 // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after
2430 // - two cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them
2431 // - one case: all three together
2432 }
2433
2434 #[cfg(feature = "sim")]
2435 #[test]
2436 fn sim_batch_unordered_shuffles() {
2437 let flow = FlowBuilder::new();
2438 let node = flow.process::<()>();
2439
2440 let input = node
2441 .source_iter(q!([(1, 1), (1, 2), (2, 3)]))
2442 .into_keyed()
2443 .weakest_ordering();
2444
2445 let tick = node.tick();
2446 let out_recv = input
2447 .batch(&tick, nondet!(/** test */))
2448 .all_ticks()
2449 .entries()
2450 .sim_output();
2451
2452 let instances = flow.sim().exhaustive(async || {
2453 out_recv
2454 .assert_yields_only_unordered([(1, 1), (1, 2), (2, 3)])
2455 .await;
2456 });
2457
2458 assert_eq!(instances, 13);
2459 // - 6 (3 * 2) cases: all three in a separate tick (pick where (2, 3) is), and order of (1, 1), (1, 2)
2460 // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2461 // - 4 (2 * 2) cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them, and order of (1, 1), (1, 2)
2462 // - one case: all three together (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2463 }
2464
2465 #[cfg(feature = "sim")]
2466 #[test]
2467 #[should_panic]
2468 fn sim_observe_order_batched() {
2469 let flow = FlowBuilder::new();
2470 let node = flow.process::<()>();
2471
2472 let (port, input) = node.sim_input::<_, NoOrder, _>();
2473
2474 let tick = node.tick();
2475 let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
2476 let out_recv = batch
2477 .assume_ordering::<TotalOrder>(nondet!(/** test */))
2478 .all_ticks()
2479 .first()
2480 .entries()
2481 .sim_output();
2482
2483 flow.sim().exhaustive(async || {
2484 port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
2485 out_recv
2486 .assert_yields_only_unordered([(1, 1), (2, 1)])
2487 .await; // fails with assume_ordering
2488 });
2489 }
2490
2491 #[cfg(feature = "sim")]
2492 #[test]
2493 fn sim_observe_order_batched_count() {
2494 let flow = FlowBuilder::new();
2495 let node = flow.process::<()>();
2496
2497 let (port, input) = node.sim_input::<_, NoOrder, _>();
2498
2499 let tick = node.tick();
2500 let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
2501 let out_recv = batch
2502 .assume_ordering::<TotalOrder>(nondet!(/** test */))
2503 .all_ticks()
2504 .entries()
2505 .sim_output();
2506
2507 let instance_count = flow.sim().exhaustive(async || {
2508 port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
2509 let _ = out_recv.collect_sorted::<Vec<_>>().await;
2510 });
2511
2512 assert_eq!(instance_count, 104); // too complicated to enumerate here, but less than stream equivalent
2513 }
2514}