hydro_lang/live_collections/keyed_stream/mod.rs
1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, Unbounded};
13use super::keyed_singleton::KeyedSingleton;
14use super::optional::Optional;
15use super::stream::{ExactlyOnce, MinOrder, MinRetries, NoOrder, Stream, TotalOrder};
16use crate::compile::ir::{
17 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
18};
19#[cfg(stageleft_runtime)]
20use crate::forward_handle::{CycleCollection, ReceiverComplete};
21use crate::forward_handle::{ForwardRef, TickCycle};
22use crate::live_collections::stream::{Ordering, Retries};
23#[cfg(stageleft_runtime)]
24use crate::location::dynamic::{DynLocation, LocationId};
25use crate::location::tick::DeferTick;
26use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
27use crate::manual_expr::ManualExpr;
28use crate::nondet::{NonDet, nondet};
29
30pub mod networking;
31
32/// Streaming elements of type `V` grouped by a key of type `K`.
33///
34/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
35/// order of keys is non-deterministic but the order *within* each group may be deterministic.
36///
37/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
38/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
39/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
40///
41/// Type Parameters:
42/// - `K`: the type of the key for each group
43/// - `V`: the type of the elements inside each group
44/// - `Loc`: the [`Location`] where the keyed stream is materialized
45/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
46/// - `Order`: tracks whether the elements within each group have deterministic order
47/// ([`TotalOrder`]) or not ([`NoOrder`])
48/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
49/// ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
50pub struct KeyedStream<
51 K,
52 V,
53 Loc,
54 Bound: Boundedness = Unbounded,
55 Order: Ordering = TotalOrder,
56 Retry: Retries = ExactlyOnce,
57> {
58 pub(crate) location: Loc,
59 pub(crate) ir_node: RefCell<HydroNode>,
60
61 _phantom: PhantomData<(K, V, Loc, Bound, Order, Retry)>,
62}
63
64impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
65 for KeyedStream<K, V, L, B, NoOrder, R>
66where
67 L: Location<'a>,
68{
69 fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
70 KeyedStream {
71 location: stream.location,
72 ir_node: stream.ir_node,
73 _phantom: PhantomData,
74 }
75 }
76}
77
78impl<'a, K, V, L, O: Ordering, R: Retries> DeferTick for KeyedStream<K, V, Tick<L>, Bounded, O, R>
79where
80 L: Location<'a>,
81{
82 fn defer_tick(self) -> Self {
83 KeyedStream::defer_tick(self)
84 }
85}
86
87impl<'a, K, V, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
88 for KeyedStream<K, V, Tick<L>, Bounded, O, R>
89where
90 L: Location<'a>,
91{
92 type Location = Tick<L>;
93
94 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
95 KeyedStream {
96 location: location.clone(),
97 ir_node: RefCell::new(HydroNode::CycleSource {
98 ident,
99 metadata: location.new_node_metadata(
100 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
101 ),
102 }),
103 _phantom: PhantomData,
104 }
105 }
106}
107
108impl<'a, K, V, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
109 for KeyedStream<K, V, Tick<L>, Bounded, O, R>
110where
111 L: Location<'a>,
112{
113 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
114 assert_eq!(
115 Location::id(&self.location),
116 expected_location,
117 "locations do not match"
118 );
119
120 self.location
121 .flow_state()
122 .borrow_mut()
123 .push_root(HydroRoot::CycleSink {
124 ident,
125 input: Box::new(self.ir_node.into_inner()),
126 op_metadata: HydroIrOpMetadata::new(),
127 });
128 }
129}
130
131impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
132 for KeyedStream<K, V, L, B, O, R>
133where
134 L: Location<'a> + NoTick,
135{
136 type Location = L;
137
138 fn create_source(ident: syn::Ident, location: L) -> Self {
139 KeyedStream {
140 location: location.clone(),
141 ir_node: RefCell::new(HydroNode::CycleSource {
142 ident,
143 metadata: location
144 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
145 }),
146 _phantom: PhantomData,
147 }
148 }
149}
150
151impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
152 for KeyedStream<K, V, L, B, O, R>
153where
154 L: Location<'a> + NoTick,
155{
156 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
157 assert_eq!(
158 Location::id(&self.location),
159 expected_location,
160 "locations do not match"
161 );
162 self.location
163 .flow_state()
164 .borrow_mut()
165 .push_root(HydroRoot::CycleSink {
166 ident,
167 input: Box::new(self.ir_node.into_inner()),
168 op_metadata: HydroIrOpMetadata::new(),
169 });
170 }
171}
172
173impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
174 Clone for KeyedStream<K, V, Loc, Bound, Order, R>
175{
176 fn clone(&self) -> Self {
177 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
178 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
179 *self.ir_node.borrow_mut() = HydroNode::Tee {
180 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
181 metadata: self.location.new_node_metadata(Self::collection_kind()),
182 };
183 }
184
185 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
186 KeyedStream {
187 location: self.location.clone(),
188 ir_node: HydroNode::Tee {
189 inner: TeeNode(inner.0.clone()),
190 metadata: metadata.clone(),
191 }
192 .into(),
193 _phantom: PhantomData,
194 }
195 } else {
196 unreachable!()
197 }
198 }
199}
200
201impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
202 KeyedStream<K, V, L, B, O, R>
203{
204 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
205 debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
206 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
207
208 KeyedStream {
209 location,
210 ir_node: RefCell::new(ir_node),
211 _phantom: PhantomData,
212 }
213 }
214
215 /// Returns the [`CollectionKind`] corresponding to this type.
216 pub fn collection_kind() -> CollectionKind {
217 CollectionKind::KeyedStream {
218 bound: B::BOUND_KIND,
219 value_order: O::ORDERING_KIND,
220 value_retry: R::RETRIES_KIND,
221 key_type: stageleft::quote_type::<K>().into(),
222 value_type: stageleft::quote_type::<V>().into(),
223 }
224 }
225
226 /// Returns the [`Location`] where this keyed stream is being materialized.
227 pub fn location(&self) -> &L {
228 &self.location
229 }
230
231 /// Explicitly "casts" the keyed stream to a type with a different ordering
232 /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
233 /// by the type-system.
234 ///
235 /// # Non-Determinism
236 /// This function is used as an escape hatch, and any mistakes in the
237 /// provided ordering guarantee will propagate into the guarantees
238 /// for the rest of the program.
239 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
240 if O::ORDERING_KIND == O2::ORDERING_KIND {
241 KeyedStream::new(self.location, self.ir_node.into_inner())
242 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
243 // We can always weaken the ordering guarantee
244 KeyedStream::new(
245 self.location.clone(),
246 HydroNode::Cast {
247 inner: Box::new(self.ir_node.into_inner()),
248 metadata: self
249 .location
250 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
251 },
252 )
253 } else {
254 KeyedStream::new(
255 self.location.clone(),
256 HydroNode::ObserveNonDet {
257 inner: Box::new(self.ir_node.into_inner()),
258 trusted: false,
259 metadata: self
260 .location
261 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
262 },
263 )
264 }
265 }
266
267 fn assume_ordering_trusted<O2: Ordering>(
268 self,
269 _nondet: NonDet,
270 ) -> KeyedStream<K, V, L, B, O2, R> {
271 if O::ORDERING_KIND == O2::ORDERING_KIND {
272 KeyedStream::new(self.location, self.ir_node.into_inner())
273 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
274 // We can always weaken the ordering guarantee
275 KeyedStream::new(
276 self.location.clone(),
277 HydroNode::Cast {
278 inner: Box::new(self.ir_node.into_inner()),
279 metadata: self
280 .location
281 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
282 },
283 )
284 } else {
285 KeyedStream::new(
286 self.location.clone(),
287 HydroNode::ObserveNonDet {
288 inner: Box::new(self.ir_node.into_inner()),
289 trusted: true,
290 metadata: self
291 .location
292 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
293 },
294 )
295 }
296 }
297
298 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
299 /// which is always safe because that is the weakest possible guarantee.
300 pub fn weakest_ordering(self) -> KeyedStream<K, V, L, B, NoOrder, R> {
301 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
302 self.assume_ordering::<NoOrder>(nondet)
303 }
304
305 /// Explicitly "casts" the keyed stream to a type with a different retries
306 /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
307 /// be proven by the type-system.
308 ///
309 /// # Non-Determinism
310 /// This function is used as an escape hatch, and any mistakes in the
311 /// provided retries guarantee will propagate into the guarantees
312 /// for the rest of the program.
313 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
314 if R::RETRIES_KIND == R2::RETRIES_KIND {
315 KeyedStream::new(self.location, self.ir_node.into_inner())
316 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
317 // We can always weaken the retries guarantee
318 KeyedStream::new(
319 self.location.clone(),
320 HydroNode::Cast {
321 inner: Box::new(self.ir_node.into_inner()),
322 metadata: self
323 .location
324 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
325 },
326 )
327 } else {
328 KeyedStream::new(
329 self.location.clone(),
330 HydroNode::ObserveNonDet {
331 inner: Box::new(self.ir_node.into_inner()),
332 trusted: false,
333 metadata: self
334 .location
335 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
336 },
337 )
338 }
339 }
340
341 /// Flattens the keyed stream into an unordered stream of key-value pairs.
342 ///
343 /// # Example
344 /// ```rust
345 /// # #[cfg(feature = "deploy")] {
346 /// # use hydro_lang::prelude::*;
347 /// # use futures::StreamExt;
348 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
349 /// process
350 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
351 /// .into_keyed()
352 /// .entries()
353 /// # }, |mut stream| async move {
354 /// // (1, 2), (1, 3), (2, 4) in any order
355 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
356 /// # assert_eq!(stream.next().await.unwrap(), w);
357 /// # }
358 /// # }));
359 /// # }
360 /// ```
361 pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
362 Stream::new(
363 self.location.clone(),
364 HydroNode::Cast {
365 inner: Box::new(self.ir_node.into_inner()),
366 metadata: self
367 .location
368 .new_node_metadata(Stream::<(K, V), L, B, NoOrder, R>::collection_kind()),
369 },
370 )
371 }
372
373 /// Flattens the keyed stream into an unordered stream of only the values.
374 ///
375 /// # Example
376 /// ```rust
377 /// # #[cfg(feature = "deploy")] {
378 /// # use hydro_lang::prelude::*;
379 /// # use futures::StreamExt;
380 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
381 /// process
382 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
383 /// .into_keyed()
384 /// .values()
385 /// # }, |mut stream| async move {
386 /// // 2, 3, 4 in any order
387 /// # for w in vec![2, 3, 4] {
388 /// # assert_eq!(stream.next().await.unwrap(), w);
389 /// # }
390 /// # }));
391 /// # }
392 /// ```
393 pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
394 self.entries().map(q!(|(_, v)| v))
395 }
396
397 /// Transforms each value by invoking `f` on each element, with keys staying the same
398 /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
399 ///
400 /// If you do not want to modify the stream and instead only want to view
401 /// each item use [`KeyedStream::inspect`] instead.
402 ///
403 /// # Example
404 /// ```rust
405 /// # #[cfg(feature = "deploy")] {
406 /// # use hydro_lang::prelude::*;
407 /// # use futures::StreamExt;
408 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
409 /// process
410 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
411 /// .into_keyed()
412 /// .map(q!(|v| v + 1))
413 /// # .entries()
414 /// # }, |mut stream| async move {
415 /// // { 1: [3, 4], 2: [5] }
416 /// # for w in vec![(1, 3), (1, 4), (2, 5)] {
417 /// # assert_eq!(stream.next().await.unwrap(), w);
418 /// # }
419 /// # }));
420 /// # }
421 /// ```
422 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
423 where
424 F: Fn(V) -> U + 'a,
425 {
426 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
427 let map_f = q!({
428 let orig = f;
429 move |(k, v)| (k, orig(v))
430 })
431 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
432 .into();
433
434 KeyedStream::new(
435 self.location.clone(),
436 HydroNode::Map {
437 f: map_f,
438 input: Box::new(self.ir_node.into_inner()),
439 metadata: self
440 .location
441 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
442 },
443 )
444 }
445
446 /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
447 /// re-grouped even they are tuples; instead they will be grouped under the original key.
448 ///
449 /// If you do not want to modify the stream and instead only want to view
450 /// each item use [`KeyedStream::inspect_with_key`] instead.
451 ///
452 /// # Example
453 /// ```rust
454 /// # #[cfg(feature = "deploy")] {
455 /// # use hydro_lang::prelude::*;
456 /// # use futures::StreamExt;
457 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
458 /// process
459 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
460 /// .into_keyed()
461 /// .map_with_key(q!(|(k, v)| k + v))
462 /// # .entries()
463 /// # }, |mut stream| async move {
464 /// // { 1: [3, 4], 2: [6] }
465 /// # for w in vec![(1, 3), (1, 4), (2, 6)] {
466 /// # assert_eq!(stream.next().await.unwrap(), w);
467 /// # }
468 /// # }));
469 /// # }
470 /// ```
471 pub fn map_with_key<U, F>(
472 self,
473 f: impl IntoQuotedMut<'a, F, L> + Copy,
474 ) -> KeyedStream<K, U, L, B, O, R>
475 where
476 F: Fn((K, V)) -> U + 'a,
477 K: Clone,
478 {
479 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
480 let map_f = q!({
481 let orig = f;
482 move |(k, v)| {
483 let out = orig((Clone::clone(&k), v));
484 (k, out)
485 }
486 })
487 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
488 .into();
489
490 KeyedStream::new(
491 self.location.clone(),
492 HydroNode::Map {
493 f: map_f,
494 input: Box::new(self.ir_node.into_inner()),
495 metadata: self
496 .location
497 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
498 },
499 )
500 }
501
502 /// Prepends a new value to the key of each element in the stream, producing a new
503 /// keyed stream with compound keys. Because the original key is preserved, no re-grouping
504 /// occurs and the elements in each group preserve their original order.
505 ///
506 /// # Example
507 /// ```rust
508 /// # #[cfg(feature = "deploy")] {
509 /// # use hydro_lang::prelude::*;
510 /// # use futures::StreamExt;
511 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
512 /// process
513 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
514 /// .into_keyed()
515 /// .prefix_key(q!(|&(k, _)| k % 2))
516 /// # .entries()
517 /// # }, |mut stream| async move {
518 /// // { (1, 1): [2, 3], (0, 2): [4] }
519 /// # for w in vec![((1, 1), 2), ((1, 1), 3), ((0, 2), 4)] {
520 /// # assert_eq!(stream.next().await.unwrap(), w);
521 /// # }
522 /// # }));
523 /// # }
524 /// ```
525 pub fn prefix_key<K2, F>(
526 self,
527 f: impl IntoQuotedMut<'a, F, L> + Copy,
528 ) -> KeyedStream<(K2, K), V, L, B, O, R>
529 where
530 F: Fn(&(K, V)) -> K2 + 'a,
531 {
532 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
533 let map_f = q!({
534 let orig = f;
535 move |kv| {
536 let out = orig(&kv);
537 ((out, kv.0), kv.1)
538 }
539 })
540 .splice_fn1_ctx::<(K, V), ((K2, K), V)>(&self.location)
541 .into();
542
543 KeyedStream::new(
544 self.location.clone(),
545 HydroNode::Map {
546 f: map_f,
547 input: Box::new(self.ir_node.into_inner()),
548 metadata: self
549 .location
550 .new_node_metadata(KeyedStream::<(K2, K), V, L, B, O, R>::collection_kind()),
551 },
552 )
553 }
554
555 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
556 /// `f`, preserving the order of the elements within the group.
557 ///
558 /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
559 /// not modify or take ownership of the values. If you need to modify the values while filtering
560 /// use [`KeyedStream::filter_map`] instead.
561 ///
562 /// # Example
563 /// ```rust
564 /// # #[cfg(feature = "deploy")] {
565 /// # use hydro_lang::prelude::*;
566 /// # use futures::StreamExt;
567 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
568 /// process
569 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
570 /// .into_keyed()
571 /// .filter(q!(|&x| x > 2))
572 /// # .entries()
573 /// # }, |mut stream| async move {
574 /// // { 1: [3], 2: [4] }
575 /// # for w in vec![(1, 3), (2, 4)] {
576 /// # assert_eq!(stream.next().await.unwrap(), w);
577 /// # }
578 /// # }));
579 /// # }
580 /// ```
581 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
582 where
583 F: Fn(&V) -> bool + 'a,
584 {
585 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
586 let filter_f = q!({
587 let orig = f;
588 move |t: &(_, _)| orig(&t.1)
589 })
590 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
591 .into();
592
593 KeyedStream::new(
594 self.location.clone(),
595 HydroNode::Filter {
596 f: filter_f,
597 input: Box::new(self.ir_node.into_inner()),
598 metadata: self.location.new_node_metadata(Self::collection_kind()),
599 },
600 )
601 }
602
603 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
604 /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
605 ///
606 /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
607 /// not modify or take ownership of the values. If you need to modify the values while filtering
608 /// use [`KeyedStream::filter_map_with_key`] instead.
609 ///
610 /// # Example
611 /// ```rust
612 /// # #[cfg(feature = "deploy")] {
613 /// # use hydro_lang::prelude::*;
614 /// # use futures::StreamExt;
615 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
616 /// process
617 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
618 /// .into_keyed()
619 /// .filter_with_key(q!(|&(k, v)| v - k == 2))
620 /// # .entries()
621 /// # }, |mut stream| async move {
622 /// // { 1: [3], 2: [4] }
623 /// # for w in vec![(1, 3), (2, 4)] {
624 /// # assert_eq!(stream.next().await.unwrap(), w);
625 /// # }
626 /// # }));
627 /// # }
628 /// ```
629 pub fn filter_with_key<F>(
630 self,
631 f: impl IntoQuotedMut<'a, F, L> + Copy,
632 ) -> KeyedStream<K, V, L, B, O, R>
633 where
634 F: Fn(&(K, V)) -> bool + 'a,
635 {
636 let filter_f = f
637 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
638 .into();
639
640 KeyedStream::new(
641 self.location.clone(),
642 HydroNode::Filter {
643 f: filter_f,
644 input: Box::new(self.ir_node.into_inner()),
645 metadata: self.location.new_node_metadata(Self::collection_kind()),
646 },
647 )
648 }
649
650 /// An operator that both filters and maps each value, with keys staying the same.
651 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
652 /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
653 ///
654 /// # Example
655 /// ```rust
656 /// # #[cfg(feature = "deploy")] {
657 /// # use hydro_lang::prelude::*;
658 /// # use futures::StreamExt;
659 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
660 /// process
661 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
662 /// .into_keyed()
663 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
664 /// # .entries()
665 /// # }, |mut stream| async move {
666 /// // { 1: [2], 2: [4] }
667 /// # for w in vec![(1, 2), (2, 4)] {
668 /// # assert_eq!(stream.next().await.unwrap(), w);
669 /// # }
670 /// # }));
671 /// # }
672 /// ```
673 pub fn filter_map<U, F>(
674 self,
675 f: impl IntoQuotedMut<'a, F, L> + Copy,
676 ) -> KeyedStream<K, U, L, B, O, R>
677 where
678 F: Fn(V) -> Option<U> + 'a,
679 {
680 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
681 let filter_map_f = q!({
682 let orig = f;
683 move |(k, v)| orig(v).map(|o| (k, o))
684 })
685 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
686 .into();
687
688 KeyedStream::new(
689 self.location.clone(),
690 HydroNode::FilterMap {
691 f: filter_map_f,
692 input: Box::new(self.ir_node.into_inner()),
693 metadata: self
694 .location
695 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
696 },
697 )
698 }
699
700 /// An operator that both filters and maps each key-value pair. The resulting values are **not**
701 /// re-grouped even they are tuples; instead they will be grouped under the original key.
702 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
703 ///
704 /// # Example
705 /// ```rust
706 /// # #[cfg(feature = "deploy")] {
707 /// # use hydro_lang::prelude::*;
708 /// # use futures::StreamExt;
709 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
710 /// process
711 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
712 /// .into_keyed()
713 /// .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
714 /// # .entries()
715 /// # }, |mut stream| async move {
716 /// // { 2: [2] }
717 /// # for w in vec![(2, 2)] {
718 /// # assert_eq!(stream.next().await.unwrap(), w);
719 /// # }
720 /// # }));
721 /// # }
722 /// ```
723 pub fn filter_map_with_key<U, F>(
724 self,
725 f: impl IntoQuotedMut<'a, F, L> + Copy,
726 ) -> KeyedStream<K, U, L, B, O, R>
727 where
728 F: Fn((K, V)) -> Option<U> + 'a,
729 K: Clone,
730 {
731 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
732 let filter_map_f = q!({
733 let orig = f;
734 move |(k, v)| {
735 let out = orig((Clone::clone(&k), v));
736 out.map(|o| (k, o))
737 }
738 })
739 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
740 .into();
741
742 KeyedStream::new(
743 self.location.clone(),
744 HydroNode::FilterMap {
745 f: filter_map_f,
746 input: Box::new(self.ir_node.into_inner()),
747 metadata: self
748 .location
749 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
750 },
751 )
752 }
753
754 /// Generates a keyed stream that maps each value `v` to a tuple `(v, x)`,
755 /// where `v` is the value of `other`, a bounded [`super::singleton::Singleton`] or
756 /// [`Optional`]. If `other` is an empty [`Optional`], no values will be produced.
757 ///
758 /// # Example
759 /// ```rust
760 /// # #[cfg(feature = "deploy")] {
761 /// # use hydro_lang::prelude::*;
762 /// # use futures::StreamExt;
763 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
764 /// let tick = process.tick();
765 /// let batch = process
766 /// .source_iter(q!(vec![(1, 123), (1, 456), (2, 123)]))
767 /// .into_keyed()
768 /// .batch(&tick, nondet!(/** test */));
769 /// let count = batch.clone().entries().count(); // `count()` returns a singleton
770 /// batch.cross_singleton(count).all_ticks().entries()
771 /// # }, |mut stream| async move {
772 /// // { 1: [(123, 3), (456, 3)], 2: [(123, 3)] }
773 /// # for w in vec![(1, (123, 3)), (1, (456, 3)), (2, (123, 3))] {
774 /// # assert_eq!(stream.next().await.unwrap(), w);
775 /// # }
776 /// # }));
777 /// # }
778 /// ```
779 pub fn cross_singleton<O2>(
780 self,
781 other: impl Into<Optional<O2, L, Bounded>>,
782 ) -> KeyedStream<K, (V, O2), L, B, O, R>
783 where
784 O2: Clone,
785 {
786 let other: Optional<O2, L, Bounded> = other.into();
787 check_matching_location(&self.location, &other.location);
788
789 Stream::new(
790 self.location.clone(),
791 HydroNode::CrossSingleton {
792 left: Box::new(self.ir_node.into_inner()),
793 right: Box::new(other.ir_node.into_inner()),
794 metadata: self
795 .location
796 .new_node_metadata(Stream::<((K, V), O2), L, B, O, R>::collection_kind()),
797 },
798 )
799 .map(q!(|((k, v), o2)| (k, (v, o2))))
800 .into_keyed()
801 }
802
803 /// For each value `v` in each group, transform `v` using `f` and then treat the
804 /// result as an [`Iterator`] to produce values one by one within the same group.
805 /// The implementation for [`Iterator`] for the output type `I` must produce items
806 /// in a **deterministic** order.
807 ///
808 /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
809 /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
810 ///
811 /// # Example
812 /// ```rust
813 /// # #[cfg(feature = "deploy")] {
814 /// # use hydro_lang::prelude::*;
815 /// # use futures::StreamExt;
816 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
817 /// process
818 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
819 /// .into_keyed()
820 /// .flat_map_ordered(q!(|x| x))
821 /// # .entries()
822 /// # }, |mut stream| async move {
823 /// // { 1: [2, 3, 4], 2: [5, 6] }
824 /// # for w in vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)] {
825 /// # assert_eq!(stream.next().await.unwrap(), w);
826 /// # }
827 /// # }));
828 /// # }
829 /// ```
830 pub fn flat_map_ordered<U, I, F>(
831 self,
832 f: impl IntoQuotedMut<'a, F, L> + Copy,
833 ) -> KeyedStream<K, U, L, B, O, R>
834 where
835 I: IntoIterator<Item = U>,
836 F: Fn(V) -> I + 'a,
837 K: Clone,
838 {
839 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
840 let flat_map_f = q!({
841 let orig = f;
842 move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
843 })
844 .splice_fn1_ctx::<(K, V), _>(&self.location)
845 .into();
846
847 KeyedStream::new(
848 self.location.clone(),
849 HydroNode::FlatMap {
850 f: flat_map_f,
851 input: Box::new(self.ir_node.into_inner()),
852 metadata: self
853 .location
854 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
855 },
856 )
857 }
858
859 /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
860 /// for the output type `I` to produce items in any order.
861 ///
862 /// # Example
863 /// ```rust
864 /// # #[cfg(feature = "deploy")] {
865 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
866 /// # use futures::StreamExt;
867 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
868 /// process
869 /// .source_iter(q!(vec![
870 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
871 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
872 /// ]))
873 /// .into_keyed()
874 /// .flat_map_unordered(q!(|x| x))
875 /// # .entries()
876 /// # }, |mut stream| async move {
877 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
878 /// # let mut results = Vec::new();
879 /// # for _ in 0..4 {
880 /// # results.push(stream.next().await.unwrap());
881 /// # }
882 /// # results.sort();
883 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
884 /// # }));
885 /// # }
886 /// ```
887 pub fn flat_map_unordered<U, I, F>(
888 self,
889 f: impl IntoQuotedMut<'a, F, L> + Copy,
890 ) -> KeyedStream<K, U, L, B, NoOrder, R>
891 where
892 I: IntoIterator<Item = U>,
893 F: Fn(V) -> I + 'a,
894 K: Clone,
895 {
896 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
897 let flat_map_f = q!({
898 let orig = f;
899 move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
900 })
901 .splice_fn1_ctx::<(K, V), _>(&self.location)
902 .into();
903
904 KeyedStream::new(
905 self.location.clone(),
906 HydroNode::FlatMap {
907 f: flat_map_f,
908 input: Box::new(self.ir_node.into_inner()),
909 metadata: self
910 .location
911 .new_node_metadata(KeyedStream::<K, U, L, B, NoOrder, R>::collection_kind()),
912 },
913 )
914 }
915
916 /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
917 /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
918 /// items in a **deterministic** order.
919 ///
920 /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
921 /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
922 ///
923 /// # Example
924 /// ```rust
925 /// # #[cfg(feature = "deploy")] {
926 /// # use hydro_lang::prelude::*;
927 /// # use futures::StreamExt;
928 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
929 /// process
930 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
931 /// .into_keyed()
932 /// .flatten_ordered()
933 /// # .entries()
934 /// # }, |mut stream| async move {
935 /// // { 1: [2, 3, 4], 2: [5, 6] }
936 /// # for w in vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)] {
937 /// # assert_eq!(stream.next().await.unwrap(), w);
938 /// # }
939 /// # }));
940 /// # }
941 /// ```
942 pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
943 where
944 V: IntoIterator<Item = U>,
945 K: Clone,
946 {
947 self.flat_map_ordered(q!(|d| d))
948 }
949
950 /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
951 /// for the value type `V` to produce items in any order.
952 ///
953 /// # Example
954 /// ```rust
955 /// # #[cfg(feature = "deploy")] {
956 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
957 /// # use futures::StreamExt;
958 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
959 /// process
960 /// .source_iter(q!(vec![
961 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
962 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
963 /// ]))
964 /// .into_keyed()
965 /// .flatten_unordered()
966 /// # .entries()
967 /// # }, |mut stream| async move {
968 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
969 /// # let mut results = Vec::new();
970 /// # for _ in 0..4 {
971 /// # results.push(stream.next().await.unwrap());
972 /// # }
973 /// # results.sort();
974 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
975 /// # }));
976 /// # }
977 /// ```
978 pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
979 where
980 V: IntoIterator<Item = U>,
981 K: Clone,
982 {
983 self.flat_map_unordered(q!(|d| d))
984 }
985
986 /// An operator which allows you to "inspect" each element of a stream without
987 /// modifying it. The closure `f` is called on a reference to each value. This is
988 /// mainly useful for debugging, and should not be used to generate side-effects.
989 ///
990 /// # Example
991 /// ```rust
992 /// # #[cfg(feature = "deploy")] {
993 /// # use hydro_lang::prelude::*;
994 /// # use futures::StreamExt;
995 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
996 /// process
997 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
998 /// .into_keyed()
999 /// .inspect(q!(|v| println!("{}", v)))
1000 /// # .entries()
1001 /// # }, |mut stream| async move {
1002 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
1003 /// # assert_eq!(stream.next().await.unwrap(), w);
1004 /// # }
1005 /// # }));
1006 /// # }
1007 /// ```
1008 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1009 where
1010 F: Fn(&V) + 'a,
1011 {
1012 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1013 let inspect_f = q!({
1014 let orig = f;
1015 move |t: &(_, _)| orig(&t.1)
1016 })
1017 .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1018 .into();
1019
1020 KeyedStream::new(
1021 self.location.clone(),
1022 HydroNode::Inspect {
1023 f: inspect_f,
1024 input: Box::new(self.ir_node.into_inner()),
1025 metadata: self.location.new_node_metadata(Self::collection_kind()),
1026 },
1027 )
1028 }
1029
1030 /// An operator which allows you to "inspect" each element of a stream without
1031 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1032 /// mainly useful for debugging, and should not be used to generate side-effects.
1033 ///
1034 /// # Example
1035 /// ```rust
1036 /// # #[cfg(feature = "deploy")] {
1037 /// # use hydro_lang::prelude::*;
1038 /// # use futures::StreamExt;
1039 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1040 /// process
1041 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1042 /// .into_keyed()
1043 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1044 /// # .entries()
1045 /// # }, |mut stream| async move {
1046 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
1047 /// # assert_eq!(stream.next().await.unwrap(), w);
1048 /// # }
1049 /// # }));
1050 /// # }
1051 /// ```
1052 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1053 where
1054 F: Fn(&(K, V)) + 'a,
1055 {
1056 let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1057
1058 KeyedStream::new(
1059 self.location.clone(),
1060 HydroNode::Inspect {
1061 f: inspect_f,
1062 input: Box::new(self.ir_node.into_inner()),
1063 metadata: self.location.new_node_metadata(Self::collection_kind()),
1064 },
1065 )
1066 }
1067
1068 /// An operator which allows you to "name" a `HydroNode`.
1069 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1070 pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
1071 {
1072 let mut node = self.ir_node.borrow_mut();
1073 let metadata = node.metadata_mut();
1074 metadata.tag = Some(name.to_string());
1075 }
1076 self
1077 }
1078}
1079
1080impl<'a, K1, K2, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
1081 KeyedStream<(K1, K2), V, L, B, O, R>
1082{
1083 /// Produces a new keyed stream by dropping the first element of the compound key.
1084 ///
1085 /// Because multiple keys may share the same suffix, this operation results in re-grouping
1086 /// of the values under the new keys. The values across groups with the same new key
1087 /// will be interleaved, so the resulting stream has [`NoOrder`] within each group.
1088 ///
1089 /// # Example
1090 /// ```rust
1091 /// # #[cfg(feature = "deploy")] {
1092 /// # use hydro_lang::prelude::*;
1093 /// # use futures::StreamExt;
1094 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1095 /// process
1096 /// .source_iter(q!(vec![((1, 10), 2), ((1, 10), 3), ((2, 20), 4)]))
1097 /// .into_keyed()
1098 /// .drop_key_prefix()
1099 /// # .entries()
1100 /// # }, |mut stream| async move {
1101 /// // { 10: [2, 3], 20: [4] }
1102 /// # for w in vec![(10, 2), (10, 3), (20, 4)] {
1103 /// # assert_eq!(stream.next().await.unwrap(), w);
1104 /// # }
1105 /// # }));
1106 /// # }
1107 /// ```
1108 pub fn drop_key_prefix(self) -> KeyedStream<K2, V, L, B, NoOrder, R> {
1109 self.entries()
1110 .map(q!(|((_k1, k2), v)| (k2, v)))
1111 .into_keyed()
1112 }
1113}
1114
1115impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries>
1116 KeyedStream<K, V, L, Unbounded, O, R>
1117{
1118 /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
1119 /// of any overlapping groups. The result has [`NoOrder`] on each group because the
1120 /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
1121 /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
1122 ///
1123 /// Currently, both input streams must be [`Unbounded`].
1124 ///
1125 /// # Example
1126 /// ```rust
1127 /// # #[cfg(feature = "deploy")] {
1128 /// # use hydro_lang::prelude::*;
1129 /// # use futures::StreamExt;
1130 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1131 /// let numbers1 = process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed();
1132 /// let numbers2 = process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed();
1133 /// numbers1.interleave(numbers2)
1134 /// # .entries()
1135 /// # }, |mut stream| async move {
1136 /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
1137 /// # for w in vec![(1, 2), (3, 4), (1, 3), (3, 5)] {
1138 /// # assert_eq!(stream.next().await.unwrap(), w);
1139 /// # }
1140 /// # }));
1141 /// # }
1142 /// ```
1143 pub fn interleave<O2: Ordering, R2: Retries>(
1144 self,
1145 other: KeyedStream<K, V, L, Unbounded, O2, R2>,
1146 ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1147 where
1148 R: MinRetries<R2>,
1149 {
1150 let tick = self.location.tick();
1151 // Because the outputs are unordered, we can interleave batches from both streams.
1152 let nondet_batch_interleaving = nondet!(/** output stream is NoOrder, can interleave */);
1153 self.batch(&tick, nondet_batch_interleaving)
1154 .weakest_ordering()
1155 .chain(
1156 other
1157 .batch(&tick, nondet_batch_interleaving)
1158 .weakest_ordering(),
1159 )
1160 .all_ticks()
1161 }
1162}
1163
1164/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
1165/// control the processing of future elements.
1166pub enum Generate<T> {
1167 /// Emit the provided element, and keep processing future inputs.
1168 Yield(T),
1169 /// Emit the provided element as the _final_ element, do not process future inputs.
1170 Return(T),
1171 /// Do not emit anything, but continue processing future inputs.
1172 Continue,
1173 /// Do not emit anything, and do not process further inputs.
1174 Break,
1175}
1176
1177impl<'a, K, V, L, B: Boundedness> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
1178where
1179 L: Location<'a>,
1180{
1181 /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
1182 ///
1183 /// Unlike [`Stream::fold_keyed`] which only returns the final accumulated value, `scan` produces a new stream
1184 /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
1185 /// early by returning `None`.
1186 ///
1187 /// The function takes a mutable reference to the accumulator and the current element, and returns
1188 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1189 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1190 ///
1191 /// # Example
1192 /// ```rust
1193 /// # #[cfg(feature = "deploy")] {
1194 /// # use hydro_lang::prelude::*;
1195 /// # use futures::StreamExt;
1196 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1197 /// process
1198 /// .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
1199 /// .into_keyed()
1200 /// .scan(
1201 /// q!(|| 0),
1202 /// q!(|acc, x| {
1203 /// *acc += x;
1204 /// if *acc % 2 == 0 { None } else { Some(*acc) }
1205 /// }),
1206 /// )
1207 /// # .entries()
1208 /// # }, |mut stream| async move {
1209 /// // Output: { 0: [1], 1: [3, 7] }
1210 /// # for w in vec![(0, 1), (1, 3), (1, 7)] {
1211 /// # assert_eq!(stream.next().await.unwrap(), w);
1212 /// # }
1213 /// # }));
1214 /// # }
1215 /// ```
1216 pub fn scan<A, U, I, F>(
1217 self,
1218 init: impl IntoQuotedMut<'a, I, L> + Copy,
1219 f: impl IntoQuotedMut<'a, F, L> + Copy,
1220 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1221 where
1222 K: Clone + Eq + Hash,
1223 I: Fn() -> A + 'a,
1224 F: Fn(&mut A, V) -> Option<U> + 'a,
1225 {
1226 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1227 self.generator(
1228 init,
1229 q!({
1230 let orig = f;
1231 move |state, v| {
1232 if let Some(out) = orig(state, v) {
1233 Generate::Yield(out)
1234 } else {
1235 Generate::Break
1236 }
1237 }
1238 }),
1239 )
1240 }
1241
1242 /// Iteratively processes the elements in each group using a state machine that can yield
1243 /// elements as it processes its inputs. This is designed to mirror the unstable generator
1244 /// syntax in Rust, without requiring special syntax.
1245 ///
1246 /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
1247 /// state for each group. The second argument defines the processing logic, taking in a
1248 /// mutable reference to the group's state and the value to be processed. It emits a
1249 /// [`Generate`] value, whose variants define what is emitted and whether further inputs
1250 /// should be processed.
1251 ///
1252 /// # Example
1253 /// ```rust
1254 /// # #[cfg(feature = "deploy")] {
1255 /// # use hydro_lang::prelude::*;
1256 /// # use futures::StreamExt;
1257 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1258 /// process
1259 /// .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
1260 /// .into_keyed()
1261 /// .generator(
1262 /// q!(|| 0),
1263 /// q!(|acc, x| {
1264 /// *acc += x;
1265 /// if *acc > 100 {
1266 /// hydro_lang::live_collections::keyed_stream::Generate::Return(
1267 /// "done!".to_string()
1268 /// )
1269 /// } else if *acc % 2 == 0 {
1270 /// hydro_lang::live_collections::keyed_stream::Generate::Yield(
1271 /// "even".to_string()
1272 /// )
1273 /// } else {
1274 /// hydro_lang::live_collections::keyed_stream::Generate::Continue
1275 /// }
1276 /// }),
1277 /// )
1278 /// # .entries()
1279 /// # }, |mut stream| async move {
1280 /// // Output: { 0: ["even", "done!"], 1: ["even"] }
1281 /// # for w in vec![(0, "even".to_string()), (0, "done!".to_string()), (1, "even".to_string())] {
1282 /// # assert_eq!(stream.next().await.unwrap(), w);
1283 /// # }
1284 /// # }));
1285 /// # }
1286 /// ```
1287 pub fn generator<A, U, I, F>(
1288 self,
1289 init: impl IntoQuotedMut<'a, I, L> + Copy,
1290 f: impl IntoQuotedMut<'a, F, L> + Copy,
1291 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1292 where
1293 K: Clone + Eq + Hash,
1294 I: Fn() -> A + 'a,
1295 F: Fn(&mut A, V) -> Generate<U> + 'a,
1296 {
1297 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1298 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1299
1300 let scan_init = q!(|| HashMap::new())
1301 .splice_fn0_ctx::<HashMap<K, Option<A>>>(&self.location)
1302 .into();
1303 let scan_f = q!(move |acc: &mut HashMap<_, _>, (k, v)| {
1304 let existing_state = acc.entry(Clone::clone(&k)).or_insert_with(|| Some(init()));
1305 if let Some(existing_state_value) = existing_state {
1306 match f(existing_state_value, v) {
1307 Generate::Yield(out) => Some(Some((k, out))),
1308 Generate::Return(out) => {
1309 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1310 Some(Some((k, out)))
1311 }
1312 Generate::Break => {
1313 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1314 Some(None)
1315 }
1316 Generate::Continue => Some(None),
1317 }
1318 } else {
1319 Some(None)
1320 }
1321 })
1322 .splice_fn2_borrow_mut_ctx::<HashMap<K, Option<A>>, (K, V), _>(&self.location)
1323 .into();
1324
1325 let scan_node = HydroNode::Scan {
1326 init: scan_init,
1327 acc: scan_f,
1328 input: Box::new(self.ir_node.into_inner()),
1329 metadata: self.location.new_node_metadata(Stream::<
1330 Option<(K, U)>,
1331 L,
1332 B,
1333 TotalOrder,
1334 ExactlyOnce,
1335 >::collection_kind()),
1336 };
1337
1338 let flatten_f = q!(|d| d)
1339 .splice_fn1_ctx::<Option<(K, U)>, _>(&self.location)
1340 .into();
1341 let flatten_node = HydroNode::FlatMap {
1342 f: flatten_f,
1343 input: Box::new(scan_node),
1344 metadata: self.location.new_node_metadata(KeyedStream::<
1345 K,
1346 U,
1347 L,
1348 B,
1349 TotalOrder,
1350 ExactlyOnce,
1351 >::collection_kind()),
1352 };
1353
1354 KeyedStream::new(self.location.clone(), flatten_node)
1355 }
1356
1357 /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
1358 /// in-order across the values in each group. But the aggregation function returns a boolean,
1359 /// which when true indicates that the aggregated result is complete and can be released to
1360 /// downstream computation. Unlike [`Stream::fold_keyed`], this means that even if the input
1361 /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
1362 /// normal stream elements.
1363 ///
1364 /// # Example
1365 /// ```rust
1366 /// # #[cfg(feature = "deploy")] {
1367 /// # use hydro_lang::prelude::*;
1368 /// # use futures::StreamExt;
1369 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1370 /// process
1371 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1372 /// .into_keyed()
1373 /// .fold_early_stop(
1374 /// q!(|| 0),
1375 /// q!(|acc, x| {
1376 /// *acc += x;
1377 /// x % 2 == 0
1378 /// }),
1379 /// )
1380 /// # .entries()
1381 /// # }, |mut stream| async move {
1382 /// // Output: { 0: 2, 1: 9 }
1383 /// # for w in vec![(0, 2), (1, 9)] {
1384 /// # assert_eq!(stream.next().await.unwrap(), w);
1385 /// # }
1386 /// # }));
1387 /// # }
1388 /// ```
1389 pub fn fold_early_stop<A, I, F>(
1390 self,
1391 init: impl IntoQuotedMut<'a, I, L> + Copy,
1392 f: impl IntoQuotedMut<'a, F, L> + Copy,
1393 ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
1394 where
1395 K: Clone + Eq + Hash,
1396 I: Fn() -> A + 'a,
1397 F: Fn(&mut A, V) -> bool + 'a,
1398 {
1399 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1400 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1401 let out_without_bound_cast = self.generator(
1402 q!(move || Some(init())),
1403 q!(move |key_state, v| {
1404 if let Some(key_state_value) = key_state.as_mut() {
1405 if f(key_state_value, v) {
1406 Generate::Return(key_state.take().unwrap())
1407 } else {
1408 Generate::Continue
1409 }
1410 } else {
1411 unreachable!()
1412 }
1413 }),
1414 );
1415
1416 KeyedSingleton::new(
1417 out_without_bound_cast.location.clone(),
1418 HydroNode::Cast {
1419 inner: Box::new(out_without_bound_cast.ir_node.into_inner()),
1420 metadata: out_without_bound_cast
1421 .location
1422 .new_node_metadata(
1423 KeyedSingleton::<K, A, L, B::WhenValueBounded>::collection_kind(),
1424 ),
1425 },
1426 )
1427 }
1428
1429 /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
1430 /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
1431 /// otherwise the first element would be non-deterministic.
1432 ///
1433 /// # Example
1434 /// ```rust
1435 /// # #[cfg(feature = "deploy")] {
1436 /// # use hydro_lang::prelude::*;
1437 /// # use futures::StreamExt;
1438 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1439 /// process
1440 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1441 /// .into_keyed()
1442 /// .first()
1443 /// # .entries()
1444 /// # }, |mut stream| async move {
1445 /// // Output: { 0: 2, 1: 3 }
1446 /// # for w in vec![(0, 2), (1, 3)] {
1447 /// # assert_eq!(stream.next().await.unwrap(), w);
1448 /// # }
1449 /// # }));
1450 /// # }
1451 /// ```
1452 pub fn first(self) -> KeyedSingleton<K, V, L, B::WhenValueBounded>
1453 where
1454 K: Clone + Eq + Hash,
1455 {
1456 self.fold_early_stop(
1457 q!(|| None),
1458 q!(|acc, v| {
1459 *acc = Some(v);
1460 true
1461 }),
1462 )
1463 .map(q!(|v| v.unwrap()))
1464 }
1465
1466 /// Like [`Stream::fold`], aggregates the values in each group via the `comb` closure.
1467 ///
1468 /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1469 /// to depend on the order of elements in the group.
1470 ///
1471 /// If the input and output value types are the same and do not require initialization then use
1472 /// [`KeyedStream::reduce`].
1473 ///
1474 /// # Example
1475 /// ```rust
1476 /// # #[cfg(feature = "deploy")] {
1477 /// # use hydro_lang::prelude::*;
1478 /// # use futures::StreamExt;
1479 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1480 /// let tick = process.tick();
1481 /// let numbers = process
1482 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1483 /// .into_keyed();
1484 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1485 /// batch
1486 /// .fold(q!(|| 0), q!(|acc, x| *acc += x))
1487 /// .entries()
1488 /// .all_ticks()
1489 /// # }, |mut stream| async move {
1490 /// // (1, 5), (2, 7)
1491 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1492 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1493 /// # }));
1494 /// # }
1495 /// ```
1496 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1497 self,
1498 init: impl IntoQuotedMut<'a, I, L>,
1499 comb: impl IntoQuotedMut<'a, F, L>,
1500 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
1501 where
1502 K: Eq + Hash,
1503 {
1504 let init = init.splice_fn0_ctx(&self.location).into();
1505 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1506
1507 KeyedSingleton::new(
1508 self.location.clone(),
1509 HydroNode::FoldKeyed {
1510 init,
1511 acc: comb,
1512 input: Box::new(self.ir_node.into_inner()),
1513 metadata: self.location.new_node_metadata(KeyedSingleton::<
1514 K,
1515 A,
1516 L,
1517 B::WhenValueUnbounded,
1518 >::collection_kind()),
1519 },
1520 )
1521 }
1522
1523 /// Like [`Stream::reduce`], aggregates the values in each group via the `comb` closure.
1524 ///
1525 /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1526 /// to depend on the order of elements in the stream.
1527 ///
1528 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1529 ///
1530 /// # Example
1531 /// ```rust
1532 /// # #[cfg(feature = "deploy")] {
1533 /// # use hydro_lang::prelude::*;
1534 /// # use futures::StreamExt;
1535 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1536 /// let tick = process.tick();
1537 /// let numbers = process
1538 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1539 /// .into_keyed();
1540 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1541 /// batch.reduce(q!(|acc, x| *acc += x)).entries().all_ticks()
1542 /// # }, |mut stream| async move {
1543 /// // (1, 5), (2, 7)
1544 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1545 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1546 /// # }));
1547 /// # }
1548 /// ```
1549 pub fn reduce<F: Fn(&mut V, V) + 'a>(
1550 self,
1551 comb: impl IntoQuotedMut<'a, F, L>,
1552 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1553 where
1554 K: Eq + Hash,
1555 {
1556 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1557
1558 KeyedSingleton::new(
1559 self.location.clone(),
1560 HydroNode::ReduceKeyed {
1561 f,
1562 input: Box::new(self.ir_node.into_inner()),
1563 metadata: self.location.new_node_metadata(KeyedSingleton::<
1564 K,
1565 V,
1566 L,
1567 B::WhenValueUnbounded,
1568 >::collection_kind()),
1569 },
1570 )
1571 }
1572
1573 /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark are automatically deleted.
1574 ///
1575 /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1576 /// to depend on the order of elements in the stream.
1577 ///
1578 /// # Example
1579 /// ```rust
1580 /// # #[cfg(feature = "deploy")] {
1581 /// # use hydro_lang::prelude::*;
1582 /// # use futures::StreamExt;
1583 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1584 /// let tick = process.tick();
1585 /// let watermark = tick.singleton(q!(1));
1586 /// let numbers = process
1587 /// .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1588 /// .into_keyed();
1589 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1590 /// batch
1591 /// .reduce_watermark(watermark, q!(|acc, x| *acc += x))
1592 /// .entries()
1593 /// .all_ticks()
1594 /// # }, |mut stream| async move {
1595 /// // (2, 204)
1596 /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
1597 /// # }));
1598 /// # }
1599 /// ```
1600 pub fn reduce_watermark<O, F>(
1601 self,
1602 other: impl Into<Optional<O, Tick<L::Root>, Bounded>>,
1603 comb: impl IntoQuotedMut<'a, F, L>,
1604 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1605 where
1606 K: Eq + Hash,
1607 O: Clone,
1608 F: Fn(&mut V, V) + 'a,
1609 {
1610 let other: Optional<O, Tick<L::Root>, Bounded> = other.into();
1611 check_matching_location(&self.location.root(), other.location.outer());
1612 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1613
1614 KeyedSingleton::new(
1615 self.location.clone(),
1616 HydroNode::ReduceKeyedWatermark {
1617 f,
1618 input: Box::new(self.ir_node.into_inner()),
1619 watermark: Box::new(other.ir_node.into_inner()),
1620 metadata: self.location.new_node_metadata(KeyedSingleton::<
1621 K,
1622 V,
1623 L,
1624 B::WhenValueUnbounded,
1625 >::collection_kind()),
1626 },
1627 )
1628 }
1629}
1630
1631impl<'a, K, V, L, B: Boundedness, O: Ordering> KeyedStream<K, V, L, B, O, ExactlyOnce>
1632where
1633 L: Location<'a>,
1634{
1635 /// Like [`Stream::fold_commutative`], aggregates the values in each group via the `comb` closure.
1636 ///
1637 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1638 ///
1639 /// If the input and output value types are the same and do not require initialization then use
1640 /// [`KeyedStream::reduce_commutative`].
1641 ///
1642 /// # Example
1643 /// ```rust
1644 /// # #[cfg(feature = "deploy")] {
1645 /// # use hydro_lang::prelude::*;
1646 /// # use futures::StreamExt;
1647 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1648 /// let tick = process.tick();
1649 /// let numbers = process
1650 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1651 /// .into_keyed();
1652 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1653 /// batch
1654 /// .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1655 /// .entries()
1656 /// .all_ticks()
1657 /// # }, |mut stream| async move {
1658 /// // (1, 5), (2, 7)
1659 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1660 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1661 /// # }));
1662 /// # }
1663 /// ```
1664 pub fn fold_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1665 self,
1666 init: impl IntoQuotedMut<'a, I, L>,
1667 comb: impl IntoQuotedMut<'a, F, L>,
1668 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
1669 where
1670 K: Eq + Hash,
1671 {
1672 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1673 .fold(init, comb)
1674 }
1675
1676 /// Like [`Stream::reduce_commutative`], aggregates the values in each group via the `comb` closure.
1677 ///
1678 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1679 ///
1680 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative`].
1681 ///
1682 /// # Example
1683 /// ```rust
1684 /// # #[cfg(feature = "deploy")] {
1685 /// # use hydro_lang::prelude::*;
1686 /// # use futures::StreamExt;
1687 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1688 /// let tick = process.tick();
1689 /// let numbers = process
1690 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1691 /// .into_keyed();
1692 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1693 /// batch
1694 /// .reduce_commutative(q!(|acc, x| *acc += x))
1695 /// .entries()
1696 /// .all_ticks()
1697 /// # }, |mut stream| async move {
1698 /// // (1, 5), (2, 7)
1699 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1700 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1701 /// # }));
1702 /// # }
1703 /// ```
1704 pub fn reduce_commutative<F: Fn(&mut V, V) + 'a>(
1705 self,
1706 comb: impl IntoQuotedMut<'a, F, L>,
1707 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1708 where
1709 K: Eq + Hash,
1710 {
1711 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1712 .reduce(comb)
1713 }
1714
1715 /// A special case of [`KeyedStream::reduce_commutative`] where tuples with keys less than the watermark are automatically deleted.
1716 ///
1717 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1718 ///
1719 /// # Example
1720 /// ```rust
1721 /// # #[cfg(feature = "deploy")] {
1722 /// # use hydro_lang::prelude::*;
1723 /// # use futures::StreamExt;
1724 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1725 /// let tick = process.tick();
1726 /// let watermark = tick.singleton(q!(1));
1727 /// let numbers = process
1728 /// .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1729 /// .into_keyed();
1730 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1731 /// batch
1732 /// .reduce_watermark_commutative(watermark, q!(|acc, x| *acc += x))
1733 /// .entries()
1734 /// .all_ticks()
1735 /// # }, |mut stream| async move {
1736 /// // (2, 204)
1737 /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
1738 /// # }));
1739 /// # }
1740 /// ```
1741 pub fn reduce_watermark_commutative<O2, F>(
1742 self,
1743 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1744 comb: impl IntoQuotedMut<'a, F, L>,
1745 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1746 where
1747 K: Eq + Hash,
1748 O2: Clone,
1749 F: Fn(&mut V, V) + 'a,
1750 {
1751 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1752 .reduce_watermark(other, comb)
1753 }
1754
1755 /// Counts the number of elements in each group, producing a [`KeyedSingleton`] with the counts.
1756 ///
1757 /// # Example
1758 /// ```rust
1759 /// # #[cfg(feature = "deploy")] {
1760 /// # use hydro_lang::prelude::*;
1761 /// # use futures::StreamExt;
1762 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1763 /// let tick = process.tick();
1764 /// let numbers = process
1765 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4), (1, 5)]))
1766 /// .into_keyed();
1767 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1768 /// batch
1769 /// .value_counts()
1770 /// .entries()
1771 /// .all_ticks()
1772 /// # }, |mut stream| async move {
1773 /// // (1, 3), (2, 2)
1774 /// # assert_eq!(stream.next().await.unwrap(), (1, 3));
1775 /// # assert_eq!(stream.next().await.unwrap(), (2, 2));
1776 /// # }));
1777 /// # }
1778 /// ```
1779 pub fn value_counts(self) -> KeyedSingleton<K, usize, L, B::WhenValueUnbounded>
1780 where
1781 K: Eq + Hash,
1782 {
1783 self.assume_ordering_trusted(
1784 nondet!(/** ordering within each group affects neither result nor intermediates */),
1785 )
1786 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1787 }
1788}
1789
1790impl<'a, K, V, L, B: Boundedness, R: Retries> KeyedStream<K, V, L, B, TotalOrder, R>
1791where
1792 L: Location<'a>,
1793{
1794 /// Like [`Stream::fold_idempotent`], aggregates the values in each group via the `comb` closure.
1795 ///
1796 /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
1797 ///
1798 /// If the input and output value types are the same and do not require initialization then use
1799 /// [`KeyedStream::reduce_idempotent`].
1800 ///
1801 /// # Example
1802 /// ```rust
1803 /// # #[cfg(feature = "deploy")] {
1804 /// # use hydro_lang::prelude::*;
1805 /// # use futures::StreamExt;
1806 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1807 /// let tick = process.tick();
1808 /// let numbers = process
1809 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1810 /// .into_keyed();
1811 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1812 /// batch
1813 /// .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1814 /// .entries()
1815 /// .all_ticks()
1816 /// # }, |mut stream| async move {
1817 /// // (1, false), (2, true)
1818 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1819 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1820 /// # }));
1821 /// # }
1822 /// ```
1823 pub fn fold_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1824 self,
1825 init: impl IntoQuotedMut<'a, I, L>,
1826 comb: impl IntoQuotedMut<'a, F, L>,
1827 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
1828 where
1829 K: Eq + Hash,
1830 {
1831 self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1832 .fold(init, comb)
1833 }
1834
1835 /// Like [`Stream::reduce_idempotent`], aggregates the values in each group via the `comb` closure.
1836 ///
1837 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1838 ///
1839 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_idempotent`].
1840 ///
1841 /// # Example
1842 /// ```rust
1843 /// # #[cfg(feature = "deploy")] {
1844 /// # use hydro_lang::prelude::*;
1845 /// # use futures::StreamExt;
1846 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1847 /// let tick = process.tick();
1848 /// let numbers = process
1849 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1850 /// .into_keyed();
1851 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1852 /// batch
1853 /// .reduce_idempotent(q!(|acc, x| *acc |= x))
1854 /// .entries()
1855 /// .all_ticks()
1856 /// # }, |mut stream| async move {
1857 /// // (1, false), (2, true)
1858 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1859 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1860 /// # }));
1861 /// # }
1862 /// ```
1863 pub fn reduce_idempotent<F: Fn(&mut V, V) + 'a>(
1864 self,
1865 comb: impl IntoQuotedMut<'a, F, L>,
1866 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1867 where
1868 K: Eq + Hash,
1869 {
1870 self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1871 .reduce(comb)
1872 }
1873
1874 /// A special case of [`KeyedStream::reduce_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1875 ///
1876 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1877 ///
1878 /// # Example
1879 /// ```rust
1880 /// # #[cfg(feature = "deploy")] {
1881 /// # use hydro_lang::prelude::*;
1882 /// # use futures::StreamExt;
1883 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1884 /// let tick = process.tick();
1885 /// let watermark = tick.singleton(q!(1));
1886 /// let numbers = process
1887 /// .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1888 /// .into_keyed();
1889 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1890 /// batch
1891 /// .reduce_watermark_idempotent(watermark, q!(|acc, x| *acc |= x))
1892 /// .entries()
1893 /// .all_ticks()
1894 /// # }, |mut stream| async move {
1895 /// // (2, true)
1896 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1897 /// # }));
1898 /// # }
1899 /// ```
1900 pub fn reduce_watermark_idempotent<O2, F>(
1901 self,
1902 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1903 comb: impl IntoQuotedMut<'a, F, L>,
1904 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1905 where
1906 K: Eq + Hash,
1907 O2: Clone,
1908 F: Fn(&mut V, V) + 'a,
1909 {
1910 self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1911 .reduce_watermark(other, comb)
1912 }
1913}
1914
1915impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1916where
1917 L: Location<'a>,
1918{
1919 /// Like [`Stream::fold_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1920 ///
1921 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1922 /// as there may be non-deterministic duplicates.
1923 ///
1924 /// If the input and output value types are the same and do not require initialization then use
1925 /// [`KeyedStream::reduce_commutative_idempotent`].
1926 ///
1927 /// # Example
1928 /// ```rust
1929 /// # #[cfg(feature = "deploy")] {
1930 /// # use hydro_lang::prelude::*;
1931 /// # use futures::StreamExt;
1932 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1933 /// let tick = process.tick();
1934 /// let numbers = process
1935 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1936 /// .into_keyed();
1937 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1938 /// batch
1939 /// .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1940 /// .entries()
1941 /// .all_ticks()
1942 /// # }, |mut stream| async move {
1943 /// // (1, false), (2, true)
1944 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1945 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1946 /// # }));
1947 /// # }
1948 /// ```
1949 pub fn fold_commutative_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1950 self,
1951 init: impl IntoQuotedMut<'a, I, L>,
1952 comb: impl IntoQuotedMut<'a, F, L>,
1953 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
1954 where
1955 K: Eq + Hash,
1956 {
1957 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1958 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1959 .fold(init, comb)
1960 }
1961
1962 /// Like [`Stream::reduce_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1963 ///
1964 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1965 /// as there may be non-deterministic duplicates.
1966 ///
1967 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative_idempotent`].
1968 ///
1969 /// # Example
1970 /// ```rust
1971 /// # #[cfg(feature = "deploy")] {
1972 /// # use hydro_lang::prelude::*;
1973 /// # use futures::StreamExt;
1974 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1975 /// let tick = process.tick();
1976 /// let numbers = process
1977 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1978 /// .into_keyed();
1979 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1980 /// batch
1981 /// .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1982 /// .entries()
1983 /// .all_ticks()
1984 /// # }, |mut stream| async move {
1985 /// // (1, false), (2, true)
1986 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1987 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1988 /// # }));
1989 /// # }
1990 /// ```
1991 pub fn reduce_commutative_idempotent<F: Fn(&mut V, V) + 'a>(
1992 self,
1993 comb: impl IntoQuotedMut<'a, F, L>,
1994 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1995 where
1996 K: Eq + Hash,
1997 {
1998 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1999 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
2000 .reduce(comb)
2001 }
2002
2003 /// A special case of [`Stream::reduce_keyed_commutative_idempotent`] where tuples with keys less than the watermark are automatically deleted.
2004 ///
2005 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2006 /// as there may be non-deterministic duplicates.
2007 ///
2008 /// # Example
2009 /// ```rust
2010 /// # #[cfg(feature = "deploy")] {
2011 /// # use hydro_lang::prelude::*;
2012 /// # use futures::StreamExt;
2013 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2014 /// let tick = process.tick();
2015 /// let watermark = tick.singleton(q!(1));
2016 /// let numbers = process
2017 /// .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
2018 /// .into_keyed();
2019 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2020 /// batch
2021 /// .reduce_watermark_commutative_idempotent(watermark, q!(|acc, x| *acc |= x))
2022 /// .entries()
2023 /// .all_ticks()
2024 /// # }, |mut stream| async move {
2025 /// // (2, true)
2026 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2027 /// # }));
2028 /// # }
2029 /// ```
2030 pub fn reduce_watermark_commutative_idempotent<O2, F>(
2031 self,
2032 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
2033 comb: impl IntoQuotedMut<'a, F, L>,
2034 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
2035 where
2036 K: Eq + Hash,
2037 O2: Clone,
2038 F: Fn(&mut V, V) + 'a,
2039 {
2040 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
2041 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
2042 .reduce_watermark(other, comb)
2043 }
2044
2045 /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
2046 /// whose keys are not in the bounded stream.
2047 ///
2048 /// # Example
2049 /// ```rust
2050 /// # #[cfg(feature = "deploy")] {
2051 /// # use hydro_lang::prelude::*;
2052 /// # use futures::StreamExt;
2053 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2054 /// let tick = process.tick();
2055 /// let keyed_stream = process
2056 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2057 /// .batch(&tick, nondet!(/** test */))
2058 /// .into_keyed();
2059 /// let keys_to_remove = process
2060 /// .source_iter(q!(vec![1, 2]))
2061 /// .batch(&tick, nondet!(/** test */));
2062 /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
2063 /// # .entries()
2064 /// # }, |mut stream| async move {
2065 /// // { 3: ['c'], 4: ['d'] }
2066 /// # for w in vec![(3, 'c'), (4, 'd')] {
2067 /// # assert_eq!(stream.next().await.unwrap(), w);
2068 /// # }
2069 /// # }));
2070 /// # }
2071 /// ```
2072 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
2073 self,
2074 other: Stream<K, L, Bounded, O2, R2>,
2075 ) -> Self
2076 where
2077 K: Eq + Hash,
2078 {
2079 check_matching_location(&self.location, &other.location);
2080
2081 KeyedStream::new(
2082 self.location.clone(),
2083 HydroNode::AntiJoin {
2084 pos: Box::new(self.ir_node.into_inner()),
2085 neg: Box::new(other.ir_node.into_inner()),
2086 metadata: self.location.new_node_metadata(Self::collection_kind()),
2087 },
2088 )
2089 }
2090}
2091
2092impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
2093where
2094 L: Location<'a>,
2095{
2096 /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
2097 /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
2098 ///
2099 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2100 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2101 /// argument that declares where the stream will be atomically processed. Batching a stream into
2102 /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2103 /// [`Tick`] will introduce asynchrony.
2104 pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
2105 let out_location = Atomic { tick: tick.clone() };
2106 KeyedStream::new(
2107 out_location.clone(),
2108 HydroNode::BeginAtomic {
2109 inner: Box::new(self.ir_node.into_inner()),
2110 metadata: out_location
2111 .new_node_metadata(KeyedStream::<K, V, Atomic<L>, B, O, R>::collection_kind()),
2112 },
2113 )
2114 }
2115
2116 /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
2117 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2118 /// the order of the input.
2119 ///
2120 /// # Non-Determinism
2121 /// The batch boundaries are non-deterministic and may change across executions.
2122 pub fn batch(
2123 self,
2124 tick: &Tick<L>,
2125 nondet: NonDet,
2126 ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2127 let _ = nondet;
2128 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2129 KeyedStream::new(
2130 tick.clone(),
2131 HydroNode::Batch {
2132 inner: Box::new(self.ir_node.into_inner()),
2133 metadata: tick.new_node_metadata(
2134 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
2135 ),
2136 },
2137 )
2138 }
2139}
2140
2141impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
2142where
2143 L: Location<'a> + NoTick,
2144{
2145 /// Returns a keyed stream corresponding to the latest batch of elements being atomically
2146 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2147 /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
2148 /// used to create the atomic section.
2149 ///
2150 /// # Non-Determinism
2151 /// The batch boundaries are non-deterministic and may change across executions.
2152 pub fn batch_atomic(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2153 let _ = nondet;
2154 KeyedStream::new(
2155 self.location.clone().tick,
2156 HydroNode::Batch {
2157 inner: Box::new(self.ir_node.into_inner()),
2158 metadata: self.location.tick.new_node_metadata(KeyedStream::<
2159 K,
2160 V,
2161 Tick<L>,
2162 Bounded,
2163 O,
2164 R,
2165 >::collection_kind(
2166 )),
2167 },
2168 )
2169 }
2170
2171 /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
2172 /// See [`KeyedStream::atomic`] for more details.
2173 pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
2174 KeyedStream::new(
2175 self.location.tick.l.clone(),
2176 HydroNode::EndAtomic {
2177 inner: Box::new(self.ir_node.into_inner()),
2178 metadata: self
2179 .location
2180 .tick
2181 .l
2182 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2183 },
2184 )
2185 }
2186}
2187
2188impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, L, Bounded, O, R>
2189where
2190 L: Location<'a>,
2191{
2192 /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
2193 /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
2194 /// is only present in one of the inputs, its values are passed through as-is). The output has
2195 /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
2196 ///
2197 /// Currently, both input streams must be [`Bounded`]. This operator will block
2198 /// on the first stream until all its elements are available. In a future version,
2199 /// we will relax the requirement on the `other` stream.
2200 ///
2201 /// # Example
2202 /// ```rust
2203 /// # #[cfg(feature = "deploy")] {
2204 /// # use hydro_lang::prelude::*;
2205 /// # use futures::StreamExt;
2206 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2207 /// let tick = process.tick();
2208 /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
2209 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2210 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2211 /// # .entries()
2212 /// # }, |mut stream| async move {
2213 /// // { 0: [2, 1], 1: [4, 3] }
2214 /// # for w in vec![(0, 2), (1, 4), (0, 1), (1, 3)] {
2215 /// # assert_eq!(stream.next().await.unwrap(), w);
2216 /// # }
2217 /// # }));
2218 /// # }
2219 /// ```
2220 pub fn chain<O2: Ordering, R2: Retries>(
2221 self,
2222 other: KeyedStream<K, V, L, Bounded, O2, R2>,
2223 ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2224 where
2225 O: MinOrder<O2>,
2226 R: MinRetries<R2>,
2227 {
2228 check_matching_location(&self.location, &other.location);
2229
2230 KeyedStream::new(
2231 self.location.clone(),
2232 HydroNode::Chain {
2233 first: Box::new(self.ir_node.into_inner()),
2234 second: Box::new(other.ir_node.into_inner()),
2235 metadata: self.location.new_node_metadata(KeyedStream::<
2236 K,
2237 V,
2238 L,
2239 Bounded,
2240 <O as MinOrder<O2>>::Min,
2241 <R as MinRetries<R2>>::Min,
2242 >::collection_kind()),
2243 },
2244 )
2245 }
2246}
2247
2248impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
2249where
2250 L: Location<'a>,
2251{
2252 /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2253 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2254 /// each key.
2255 pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
2256 KeyedStream::new(
2257 self.location.outer().clone(),
2258 HydroNode::YieldConcat {
2259 inner: Box::new(self.ir_node.into_inner()),
2260 metadata: self.location.outer().new_node_metadata(KeyedStream::<
2261 K,
2262 V,
2263 L,
2264 Unbounded,
2265 O,
2266 R,
2267 >::collection_kind(
2268 )),
2269 },
2270 )
2271 }
2272
2273 /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2274 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2275 /// each key.
2276 ///
2277 /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
2278 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2279 /// stream's [`Tick`] context.
2280 pub fn all_ticks_atomic(self) -> KeyedStream<K, V, Atomic<L>, Unbounded, O, R> {
2281 let out_location = Atomic {
2282 tick: self.location.clone(),
2283 };
2284
2285 KeyedStream::new(
2286 out_location.clone(),
2287 HydroNode::YieldConcat {
2288 inner: Box::new(self.ir_node.into_inner()),
2289 metadata: out_location.new_node_metadata(KeyedStream::<
2290 K,
2291 V,
2292 Atomic<L>,
2293 Unbounded,
2294 O,
2295 R,
2296 >::collection_kind()),
2297 },
2298 )
2299 }
2300
2301 /// Shifts the entries in `self` to the **next tick**, so that the returned keyed stream at
2302 /// tick `T` always has the entries of `self` at tick `T - 1`.
2303 ///
2304 /// At tick `0`, the output keyed stream is empty, since there is no previous tick.
2305 ///
2306 /// This operator enables stateful iterative processing with ticks, by sending data from one
2307 /// tick to the next. For example, you can use it to combine inputs across consecutive batches.
2308 ///
2309 /// # Example
2310 /// ```rust
2311 /// # #[cfg(feature = "deploy")] {
2312 /// # use hydro_lang::prelude::*;
2313 /// # use futures::StreamExt;
2314 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2315 /// let tick = process.tick();
2316 /// # // ticks are lazy by default, forces the second tick to run
2317 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2318 /// # let batch_first_tick = process
2319 /// # .source_iter(q!(vec![(1, 2), (1, 3)]))
2320 /// # .batch(&tick, nondet!(/** test */))
2321 /// # .into_keyed();
2322 /// # let batch_second_tick = process
2323 /// # .source_iter(q!(vec![(1, 4), (2, 5)]))
2324 /// # .batch(&tick, nondet!(/** test */))
2325 /// # .defer_tick()
2326 /// # .into_keyed(); // appears on the second tick
2327 /// let changes_across_ticks = // { 1: [2, 3] } (first tick), { 1: [4], 2: [5] } (second tick)
2328 /// # batch_first_tick.chain(batch_second_tick);
2329 /// changes_across_ticks.clone().defer_tick().chain( // from the previous tick
2330 /// changes_across_ticks // from the current tick
2331 /// )
2332 /// # .entries().all_ticks()
2333 /// # }, |mut stream| async move {
2334 /// // { 1: [2, 3] } (first tick), { 1: [2, 3, 4], 2: [5] } (second tick), { 1: [4], 2: [5] } (third tick)
2335 /// # for w in vec![(1, 2), (1, 3), (1, 2), (1, 3), (1, 4), (2, 5), (1, 4), (2, 5)] {
2336 /// # assert_eq!(stream.next().await.unwrap(), w);
2337 /// # }
2338 /// # }));
2339 /// # }
2340 /// ```
2341 pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2342 KeyedStream::new(
2343 self.location.clone(),
2344 HydroNode::DeferTick {
2345 input: Box::new(self.ir_node.into_inner()),
2346 metadata: self.location.new_node_metadata(KeyedStream::<
2347 K,
2348 V,
2349 Tick<L>,
2350 Bounded,
2351 O,
2352 R,
2353 >::collection_kind()),
2354 },
2355 )
2356 }
2357}
2358
2359#[cfg(test)]
2360mod tests {
2361 #[cfg(feature = "deploy")]
2362 use futures::{SinkExt, StreamExt};
2363 #[cfg(feature = "deploy")]
2364 use hydro_deploy::Deployment;
2365 #[cfg(any(feature = "deploy", feature = "sim"))]
2366 use stageleft::q;
2367
2368 #[cfg(any(feature = "deploy", feature = "sim"))]
2369 use crate::compile::builder::FlowBuilder;
2370 #[cfg(feature = "deploy")]
2371 use crate::live_collections::stream::ExactlyOnce;
2372 #[cfg(feature = "sim")]
2373 use crate::live_collections::stream::{NoOrder, TotalOrder};
2374 #[cfg(any(feature = "deploy", feature = "sim"))]
2375 use crate::location::Location;
2376 #[cfg(any(feature = "deploy", feature = "sim"))]
2377 use crate::nondet::nondet;
2378
2379 #[cfg(feature = "deploy")]
2380 #[tokio::test]
2381 async fn reduce_watermark_filter() {
2382 let mut deployment = Deployment::new();
2383
2384 let flow = FlowBuilder::new();
2385 let node = flow.process::<()>();
2386 let external = flow.external::<()>();
2387
2388 let node_tick = node.tick();
2389 let watermark = node_tick.singleton(q!(1));
2390
2391 let sum = node
2392 .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2393 .into_keyed()
2394 .reduce_watermark(
2395 watermark,
2396 q!(|acc, v| {
2397 *acc += v;
2398 }),
2399 )
2400 .snapshot(&node_tick, nondet!(/** test */))
2401 .entries()
2402 .all_ticks()
2403 .send_bincode_external(&external);
2404
2405 let nodes = flow
2406 .with_process(&node, deployment.Localhost())
2407 .with_external(&external, deployment.Localhost())
2408 .deploy(&mut deployment);
2409
2410 deployment.deploy().await.unwrap();
2411
2412 let mut out = nodes.connect(sum).await;
2413
2414 deployment.start().await.unwrap();
2415
2416 assert_eq!(out.next().await.unwrap(), (2, 204));
2417 }
2418
2419 #[cfg(feature = "deploy")]
2420 #[tokio::test]
2421 async fn reduce_watermark_garbage_collect() {
2422 let mut deployment = Deployment::new();
2423
2424 let flow = FlowBuilder::new();
2425 let node = flow.process::<()>();
2426 let external = flow.external::<()>();
2427 let (tick_send, tick_trigger) =
2428 node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
2429
2430 let node_tick = node.tick();
2431 let (watermark_complete_cycle, watermark) =
2432 node_tick.cycle_with_initial(node_tick.singleton(q!(1)));
2433 let next_watermark = watermark.clone().map(q!(|v| v + 1));
2434 watermark_complete_cycle.complete_next_tick(next_watermark);
2435
2436 let tick_triggered_input = node
2437 .source_iter(q!([(3, 103)]))
2438 .batch(&node_tick, nondet!(/** test */))
2439 .filter_if_some(
2440 tick_trigger
2441 .clone()
2442 .batch(&node_tick, nondet!(/** test */))
2443 .first(),
2444 )
2445 .all_ticks();
2446
2447 let sum = node
2448 .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2449 .interleave(tick_triggered_input)
2450 .into_keyed()
2451 .reduce_watermark_commutative(
2452 watermark,
2453 q!(|acc, v| {
2454 *acc += v;
2455 }),
2456 )
2457 .snapshot(&node_tick, nondet!(/** test */))
2458 .entries()
2459 .all_ticks()
2460 .send_bincode_external(&external);
2461
2462 let nodes = flow
2463 .with_default_optimize()
2464 .with_process(&node, deployment.Localhost())
2465 .with_external(&external, deployment.Localhost())
2466 .deploy(&mut deployment);
2467
2468 deployment.deploy().await.unwrap();
2469
2470 let mut tick_send = nodes.connect(tick_send).await;
2471 let mut out_recv = nodes.connect(sum).await;
2472
2473 deployment.start().await.unwrap();
2474
2475 assert_eq!(out_recv.next().await.unwrap(), (2, 204));
2476
2477 tick_send.send(()).await.unwrap();
2478
2479 assert_eq!(out_recv.next().await.unwrap(), (3, 103));
2480 }
2481
2482 #[cfg(feature = "sim")]
2483 #[test]
2484 #[should_panic]
2485 fn sim_batch_nondet_size() {
2486 let flow = FlowBuilder::new();
2487 let node = flow.process::<()>();
2488
2489 let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2490
2491 let tick = node.tick();
2492 let out_recv = input
2493 .batch(&tick, nondet!(/** test */))
2494 .fold(q!(|| vec![]), q!(|acc, v| acc.push(v)))
2495 .entries()
2496 .all_ticks()
2497 .sim_output();
2498
2499 flow.sim().exhaustive(async || {
2500 out_recv
2501 .assert_yields_only_unordered([(1, vec![1, 2])])
2502 .await;
2503 });
2504 }
2505
2506 #[cfg(feature = "sim")]
2507 #[test]
2508 fn sim_batch_preserves_group_order() {
2509 let flow = FlowBuilder::new();
2510 let node = flow.process::<()>();
2511
2512 let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2513
2514 let tick = node.tick();
2515 let out_recv = input
2516 .batch(&tick, nondet!(/** test */))
2517 .all_ticks()
2518 .fold_early_stop(
2519 q!(|| 0),
2520 q!(|acc, v| {
2521 *acc = std::cmp::max(v, *acc);
2522 *acc >= 2
2523 }),
2524 )
2525 .entries()
2526 .sim_output();
2527
2528 let instances = flow.sim().exhaustive(async || {
2529 out_recv
2530 .assert_yields_only_unordered([(1, 2), (2, 3)])
2531 .await;
2532 });
2533
2534 assert_eq!(instances, 8);
2535 // - three cases: all three in a separate tick (pick where (2, 3) is)
2536 // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after
2537 // - two cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them
2538 // - one case: all three together
2539 }
2540
2541 #[cfg(feature = "sim")]
2542 #[test]
2543 fn sim_batch_unordered_shuffles() {
2544 let flow = FlowBuilder::new();
2545 let node = flow.process::<()>();
2546
2547 let input = node
2548 .source_iter(q!([(1, 1), (1, 2), (2, 3)]))
2549 .into_keyed()
2550 .weakest_ordering();
2551
2552 let tick = node.tick();
2553 let out_recv = input
2554 .batch(&tick, nondet!(/** test */))
2555 .all_ticks()
2556 .entries()
2557 .sim_output();
2558
2559 let instances = flow.sim().exhaustive(async || {
2560 out_recv
2561 .assert_yields_only_unordered([(1, 1), (1, 2), (2, 3)])
2562 .await;
2563 });
2564
2565 assert_eq!(instances, 13);
2566 // - 6 (3 * 2) cases: all three in a separate tick (pick where (2, 3) is), and order of (1, 1), (1, 2)
2567 // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2568 // - 4 (2 * 2) cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them, and order of (1, 1), (1, 2)
2569 // - one case: all three together (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2570 }
2571
2572 #[cfg(feature = "sim")]
2573 #[test]
2574 #[should_panic]
2575 fn sim_observe_order_batched() {
2576 let flow = FlowBuilder::new();
2577 let node = flow.process::<()>();
2578
2579 let (port, input) = node.sim_input::<_, NoOrder, _>();
2580
2581 let tick = node.tick();
2582 let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
2583 let out_recv = batch
2584 .assume_ordering::<TotalOrder>(nondet!(/** test */))
2585 .all_ticks()
2586 .first()
2587 .entries()
2588 .sim_output();
2589
2590 flow.sim().exhaustive(async || {
2591 port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
2592 out_recv
2593 .assert_yields_only_unordered([(1, 1), (2, 1)])
2594 .await; // fails with assume_ordering
2595 });
2596 }
2597
2598 #[cfg(feature = "sim")]
2599 #[test]
2600 fn sim_observe_order_batched_count() {
2601 let flow = FlowBuilder::new();
2602 let node = flow.process::<()>();
2603
2604 let (port, input) = node.sim_input::<_, NoOrder, _>();
2605
2606 let tick = node.tick();
2607 let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
2608 let out_recv = batch
2609 .assume_ordering::<TotalOrder>(nondet!(/** test */))
2610 .all_ticks()
2611 .entries()
2612 .sim_output();
2613
2614 let instance_count = flow.sim().exhaustive(async || {
2615 port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
2616 let _ = out_recv.collect_sorted::<Vec<_>>().await;
2617 });
2618
2619 assert_eq!(instance_count, 104); // too complicated to enumerate here, but less than stream equivalent
2620 }
2621}