hydro_lang/keyed_stream.rs
1use std::hash::Hash;
2use std::marker::PhantomData;
3
4use stageleft::{IntoQuotedMut, QuotedWithContext, q};
5
6use crate::boundedness::Boundedness;
7use crate::cycle::{CycleCollection, CycleComplete, ForwardRefMarker};
8use crate::ir::HydroNode;
9use crate::keyed_singleton::KeyedSingleton;
10use crate::location::tick::NoAtomic;
11use crate::location::{LocationId, NoTick, check_matching_location};
12use crate::manual_expr::ManualExpr;
13use crate::stream::{ExactlyOnce, MinOrder, MinRetries};
14use crate::unsafety::NonDet;
15use crate::*;
16
17/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`,
18/// where the order of keys is non-deterministic but the order *within* each group may
19/// be deterministic.
20///
21/// Type Parameters:
22/// - `K`: the type of the key for each group
23/// - `V`: the type of the elements inside each group
24/// - `Loc`: the [`Location`] where the keyed stream is materialized
25/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
26/// - `Order`: tracks whether the elements within each group have deterministic order
27/// ([`TotalOrder`]) or not ([`NoOrder`])
28/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
29/// ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::stream::AtLeastOnce`])
30pub struct KeyedStream<K, V, Loc, Bound: Boundedness, Order = TotalOrder, Retries = ExactlyOnce> {
31 pub(crate) underlying: Stream<(K, V), Loc, Bound, NoOrder, Retries>,
32 pub(crate) _phantom_order: PhantomData<Order>,
33}
34
35impl<'a, K, V, L, B: Boundedness, R> From<KeyedStream<K, V, L, B, TotalOrder, R>>
36 for KeyedStream<K, V, L, B, NoOrder, R>
37where
38 L: Location<'a>,
39{
40 fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
41 KeyedStream {
42 underlying: stream.underlying,
43 _phantom_order: Default::default(),
44 }
45 }
46}
47
48impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order, Retries> Clone
49 for KeyedStream<K, V, Loc, Bound, Order, Retries>
50{
51 fn clone(&self) -> Self {
52 KeyedStream {
53 underlying: self.underlying.clone(),
54 _phantom_order: PhantomData,
55 }
56 }
57}
58
59impl<'a, K, V, L, B: Boundedness, O, R> CycleCollection<'a, ForwardRefMarker>
60 for KeyedStream<K, V, L, B, O, R>
61where
62 L: Location<'a> + NoTick,
63{
64 type Location = L;
65
66 fn create_source(ident: syn::Ident, location: L) -> Self {
67 Stream::create_source(ident, location).into_keyed()
68 }
69}
70
71impl<'a, K, V, L, B: Boundedness, O, R> CycleComplete<'a, ForwardRefMarker>
72 for KeyedStream<K, V, L, B, O, R>
73where
74 L: Location<'a> + NoTick,
75{
76 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
77 self.underlying.complete(ident, expected_location);
78 }
79}
80
81impl<'a, K, V, L: Location<'a>, B: Boundedness, O, R> KeyedStream<K, V, L, B, O, R> {
82 /// Explicitly "casts" the keyed stream to a type with a different ordering
83 /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
84 /// by the type-system.
85 ///
86 /// # Non-Determinism
87 /// This function is used as an escape hatch, and any mistakes in the
88 /// provided ordering guarantee will propagate into the guarantees
89 /// for the rest of the program.
90 pub fn assume_ordering<O2>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
91 KeyedStream {
92 underlying: self.underlying,
93 _phantom_order: PhantomData,
94 }
95 }
96
97 /// Explicitly "casts" the keyed stream to a type with a different retries
98 /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
99 /// be proven by the type-system.
100 ///
101 /// # Non-Determinism
102 /// This function is used as an escape hatch, and any mistakes in the
103 /// provided retries guarantee will propagate into the guarantees
104 /// for the rest of the program.
105 pub fn assume_retries<R2>(self, nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
106 KeyedStream {
107 underlying: self.underlying.assume_retries::<R2>(nondet),
108 _phantom_order: PhantomData,
109 }
110 }
111
112 /// Flattens the keyed stream into a single stream of key-value pairs, with non-deterministic
113 /// element ordering.
114 ///
115 /// # Example
116 /// ```rust
117 /// # use hydro_lang::*;
118 /// # use futures::StreamExt;
119 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
120 /// process
121 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
122 /// .into_keyed()
123 /// .entries()
124 /// # }, |mut stream| async move {
125 /// // (1, 2), (1, 3), (2, 4) in any order
126 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
127 /// # assert_eq!(stream.next().await.unwrap(), w);
128 /// # }
129 /// # }));
130 /// ```
131 pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
132 self.underlying
133 }
134
135 /// Flattens the keyed stream into a single stream of only the values, with non-deterministic
136 /// element ordering.
137 ///
138 /// # Example
139 /// ```rust
140 /// # use hydro_lang::*;
141 /// # use futures::StreamExt;
142 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
143 /// process
144 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
145 /// .into_keyed()
146 /// .values()
147 /// # }, |mut stream| async move {
148 /// // 2, 3, 4 in any order
149 /// # for w in vec![2, 3, 4] {
150 /// # assert_eq!(stream.next().await.unwrap(), w);
151 /// # }
152 /// # }));
153 /// ```
154 pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
155 self.underlying.map(q!(|(_, v)| v))
156 }
157
158 /// Transforms each value by invoking `f` on each element, with keys staying the same
159 /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
160 ///
161 /// If you do not want to modify the stream and instead only want to view
162 /// each item use [`KeyedStream::inspect`] instead.
163 ///
164 /// # Example
165 /// ```rust
166 /// # use hydro_lang::*;
167 /// # use futures::StreamExt;
168 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
169 /// process
170 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
171 /// .into_keyed()
172 /// .map(q!(|v| v + 1))
173 /// # .entries()
174 /// # }, |mut stream| async move {
175 /// // { 1: [3, 4], 2: [5] }
176 /// # for w in vec![(1, 3), (1, 4), (2, 5)] {
177 /// # assert_eq!(stream.next().await.unwrap(), w);
178 /// # }
179 /// # }));
180 /// ```
181 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
182 where
183 F: Fn(V) -> U + 'a,
184 {
185 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
186 KeyedStream {
187 underlying: self.underlying.map(q!({
188 let orig = f;
189 move |(k, v)| (k, orig(v))
190 })),
191 _phantom_order: Default::default(),
192 }
193 }
194
195 /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
196 /// re-grouped even they are tuples; instead they will be grouped under the original key.
197 ///
198 /// If you do not want to modify the stream and instead only want to view
199 /// each item use [`KeyedStream::inspect_with_key`] instead.
200 ///
201 /// # Example
202 /// ```rust
203 /// # use hydro_lang::*;
204 /// # use futures::StreamExt;
205 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
206 /// process
207 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
208 /// .into_keyed()
209 /// .map_with_key(q!(|(k, v)| k + v))
210 /// # .entries()
211 /// # }, |mut stream| async move {
212 /// // { 1: [3, 4], 2: [6] }
213 /// # for w in vec![(1, 3), (1, 4), (2, 6)] {
214 /// # assert_eq!(stream.next().await.unwrap(), w);
215 /// # }
216 /// # }));
217 /// ```
218 pub fn map_with_key<U, F>(
219 self,
220 f: impl IntoQuotedMut<'a, F, L> + Copy,
221 ) -> KeyedStream<K, U, L, B, O, R>
222 where
223 F: Fn((K, V)) -> U + 'a,
224 K: Clone,
225 {
226 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
227 KeyedStream {
228 underlying: self.underlying.map(q!({
229 let orig = f;
230 move |(k, v)| {
231 let out = orig((k.clone(), v));
232 (k, out)
233 }
234 })),
235 _phantom_order: Default::default(),
236 }
237 }
238
239 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
240 /// `f`, preserving the order of the elements within the group.
241 ///
242 /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
243 /// not modify or take ownership of the values. If you need to modify the values while filtering
244 /// use [`KeyedStream::filter_map`] instead.
245 ///
246 /// # Example
247 /// ```rust
248 /// # use hydro_lang::*;
249 /// # use futures::StreamExt;
250 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
251 /// process
252 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
253 /// .into_keyed()
254 /// .filter(q!(|&x| x > 2))
255 /// # .entries()
256 /// # }, |mut stream| async move {
257 /// // { 1: [3], 2: [4] }
258 /// # for w in vec![(1, 3), (2, 4)] {
259 /// # assert_eq!(stream.next().await.unwrap(), w);
260 /// # }
261 /// # }));
262 /// ```
263 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
264 where
265 F: Fn(&V) -> bool + 'a,
266 {
267 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
268 KeyedStream {
269 underlying: self.underlying.filter(q!({
270 let orig = f;
271 move |(_k, v)| orig(v)
272 })),
273 _phantom_order: Default::default(),
274 }
275 }
276
277 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
278 /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
279 ///
280 /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
281 /// not modify or take ownership of the values. If you need to modify the values while filtering
282 /// use [`KeyedStream::filter_map_with_key`] instead.
283 ///
284 /// # Example
285 /// ```rust
286 /// # use hydro_lang::*;
287 /// # use futures::StreamExt;
288 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
289 /// process
290 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
291 /// .into_keyed()
292 /// .filter_with_key(q!(|&(k, v)| v - k == 2))
293 /// # .entries()
294 /// # }, |mut stream| async move {
295 /// // { 1: [3], 2: [4] }
296 /// # for w in vec![(1, 3), (2, 4)] {
297 /// # assert_eq!(stream.next().await.unwrap(), w);
298 /// # }
299 /// # }));
300 /// ```
301 pub fn filter_with_key<F>(
302 self,
303 f: impl IntoQuotedMut<'a, F, L> + Copy,
304 ) -> KeyedStream<K, V, L, B, O, R>
305 where
306 F: Fn(&(K, V)) -> bool + 'a,
307 {
308 KeyedStream {
309 underlying: self.underlying.filter(f),
310 _phantom_order: Default::default(),
311 }
312 }
313
314 /// An operator that both filters and maps each value, with keys staying the same.
315 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
316 /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
317 ///
318 /// # Example
319 /// ```rust
320 /// # use hydro_lang::*;
321 /// # use futures::StreamExt;
322 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
323 /// process
324 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
325 /// .into_keyed()
326 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
327 /// # .entries()
328 /// # }, |mut stream| async move {
329 /// // { 1: [2], 2: [4] }
330 /// # for w in vec![(1, 2), (2, 4)] {
331 /// # assert_eq!(stream.next().await.unwrap(), w);
332 /// # }
333 /// # }));
334 /// ```
335 pub fn filter_map<U, F>(
336 self,
337 f: impl IntoQuotedMut<'a, F, L> + Copy,
338 ) -> KeyedStream<K, U, L, B, O, R>
339 where
340 F: Fn(V) -> Option<U> + 'a,
341 {
342 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
343 KeyedStream {
344 underlying: self.underlying.filter_map(q!({
345 let orig = f;
346 move |(k, v)| orig(v).map(|o| (k, o))
347 })),
348 _phantom_order: Default::default(),
349 }
350 }
351
352 /// An operator that both filters and maps each key-value pair. The resulting values are **not**
353 /// re-grouped even they are tuples; instead they will be grouped under the original key.
354 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
355 ///
356 /// # Example
357 /// ```rust
358 /// # use hydro_lang::*;
359 /// # use futures::StreamExt;
360 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
361 /// process
362 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
363 /// .into_keyed()
364 /// .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
365 /// # .entries()
366 /// # }, |mut stream| async move {
367 /// // { 2: [2] }
368 /// # for w in vec![(2, 2)] {
369 /// # assert_eq!(stream.next().await.unwrap(), w);
370 /// # }
371 /// # }));
372 /// ```
373 pub fn filter_map_with_key<U, F>(
374 self,
375 f: impl IntoQuotedMut<'a, F, L> + Copy,
376 ) -> KeyedStream<K, U, L, B, O, R>
377 where
378 F: Fn((K, V)) -> Option<U> + 'a,
379 K: Clone,
380 {
381 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
382 KeyedStream {
383 underlying: self.underlying.filter_map(q!({
384 let orig = f;
385 move |(k, v)| {
386 let out = orig((k.clone(), v));
387 out.map(|o| (k, o))
388 }
389 })),
390 _phantom_order: Default::default(),
391 }
392 }
393
394 /// An operator which allows you to "inspect" each element of a stream without
395 /// modifying it. The closure `f` is called on a reference to each value. This is
396 /// mainly useful for debugging, and should not be used to generate side-effects.
397 ///
398 /// # Example
399 /// ```rust
400 /// # use hydro_lang::*;
401 /// # use futures::StreamExt;
402 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
403 /// process
404 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
405 /// .into_keyed()
406 /// .inspect(q!(|v| println!("{}", v)))
407 /// # .entries()
408 /// # }, |mut stream| async move {
409 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
410 /// # assert_eq!(stream.next().await.unwrap(), w);
411 /// # }
412 /// # }));
413 /// ```
414 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
415 where
416 F: Fn(&V) + 'a,
417 {
418 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
419 KeyedStream {
420 underlying: self.underlying.inspect(q!({
421 let orig = f;
422 move |(_k, v)| orig(v)
423 })),
424 _phantom_order: Default::default(),
425 }
426 }
427
428 /// An operator which allows you to "inspect" each element of a stream without
429 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
430 /// mainly useful for debugging, and should not be used to generate side-effects.
431 ///
432 /// # Example
433 /// ```rust
434 /// # use hydro_lang::*;
435 /// # use futures::StreamExt;
436 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
437 /// process
438 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
439 /// .into_keyed()
440 /// .inspect(q!(|v| println!("{}", v)))
441 /// # .entries()
442 /// # }, |mut stream| async move {
443 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
444 /// # assert_eq!(stream.next().await.unwrap(), w);
445 /// # }
446 /// # }));
447 /// ```
448 pub fn inspect_with_key<F>(
449 self,
450 f: impl IntoQuotedMut<'a, F, L>,
451 ) -> KeyedStream<K, V, L, B, O, R>
452 where
453 F: Fn(&(K, V)) + 'a,
454 {
455 KeyedStream {
456 underlying: self.underlying.inspect(f),
457 _phantom_order: Default::default(),
458 }
459 }
460
461 /// An operator which allows you to "name" a `HydroNode`.
462 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
463 pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
464 {
465 let mut node = self.underlying.ir_node.borrow_mut();
466 let metadata = node.metadata_mut();
467 metadata.tag = Some(name.to_string());
468 }
469 self
470 }
471}
472
473impl<'a, K, V, L: Location<'a> + NoTick + NoAtomic, O, R> KeyedStream<K, V, L, Unbounded, O, R> {
474 /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
475 /// of any overlapping groups. The result has [`NoOrder`] on each group because the
476 /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
477 /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
478 ///
479 /// Currently, both input streams must be [`Unbounded`].
480 ///
481 /// # Example
482 /// ```rust
483 /// # use hydro_lang::*;
484 /// # use futures::StreamExt;
485 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
486 /// let numbers1 = process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed();
487 /// let numbers2 = process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed();
488 /// numbers1.interleave(numbers2)
489 /// # .entries()
490 /// # }, |mut stream| async move {
491 /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
492 /// # for w in vec![(1, 2), (3, 4), (1, 3), (3, 5)] {
493 /// # assert_eq!(stream.next().await.unwrap(), w);
494 /// # }
495 /// # }));
496 /// ```
497 pub fn interleave<O2, R2: MinRetries<R>>(
498 self,
499 other: KeyedStream<K, V, L, Unbounded, O2, R2>,
500 ) -> KeyedStream<K, V, L, Unbounded, NoOrder, R::Min>
501 where
502 R: MinRetries<R2, Min = R2::Min>,
503 {
504 self.entries().interleave(other.entries()).into_keyed()
505 }
506}
507
508impl<'a, K, V, L, B: Boundedness> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
509where
510 K: Eq + Hash,
511 L: Location<'a>,
512{
513 /// A special case of [`Stream::scan`] for keyd streams. For each key group the values are transformed via the `f` combinator.
514 ///
515 /// Unlike [`Stream::fold_keyed`] which only returns the final accumulated value, `scan` produces a new stream
516 /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
517 /// early by returning `None`.
518 ///
519 /// The function takes a mutable reference to the accumulator and the current element, and returns
520 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
521 /// If the function returns `None`, the stream is terminated and no more elements are processed.
522 ///
523 /// # Example
524 /// ```rust
525 /// # use hydro_lang::*;
526 /// # use futures::StreamExt;
527 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
528 /// process
529 /// .source_iter(q!(vec![(0, 1), (0, 2), (1, 3), (1, 4)]))
530 /// .into_keyed()
531 /// .scan(
532 /// q!(|| 0),
533 /// q!(|acc, x| {
534 /// *acc += x;
535 /// Some(*acc)
536 /// }),
537 /// )
538 /// # .entries()
539 /// # }, |mut stream| async move {
540 /// // Output: { 0: [1, 3], 1: [3, 7] }
541 /// # for w in vec![(0, 1), (0, 3), (1, 3), (1, 7)] {
542 /// # assert_eq!(stream.next().await.unwrap(), w);
543 /// # }
544 /// # }));
545 /// ```
546 pub fn scan<A, U, I, F>(
547 self,
548 init: impl IntoQuotedMut<'a, I, L> + Copy,
549 f: impl IntoQuotedMut<'a, F, L> + Copy,
550 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
551 where
552 K: Clone,
553 I: Fn() -> A + 'a,
554 F: Fn(&mut A, V) -> Option<U> + 'a,
555 {
556 KeyedStream {
557 underlying: self
558 .underlying
559 .assume_ordering::<TotalOrder>(
560 nondet!(/** keyed scan does not rely on order of keys */),
561 )
562 .scan_keyed(init, f)
563 .into(),
564 _phantom_order: Default::default(),
565 }
566 }
567
568 /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed in-order across the values
569 /// in each group. But the aggregation function returns a boolean, which when true indicates that the aggregated
570 /// result is complete and can be released to downstream computation. Unlike [`Stream::fold_keyed`], this means that
571 /// even if the input stream is [`crate::Unbounded`], the outputs of the fold can be processed like normal stream elements.
572 ///
573 /// # Example
574 /// ```rust
575 /// # use hydro_lang::*;
576 /// # use futures::StreamExt;
577 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
578 /// process
579 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
580 /// .into_keyed()
581 /// .fold_early_stop(
582 /// q!(|| 0),
583 /// q!(|acc, x| {
584 /// *acc += x;
585 /// x % 2 == 0
586 /// }),
587 /// )
588 /// # .entries()
589 /// # }, |mut stream| async move {
590 /// // Output: { 0: 2, 1: 9 }
591 /// # for w in vec![(0, 2), (1, 9)] {
592 /// # assert_eq!(stream.next().await.unwrap(), w);
593 /// # }
594 /// # }));
595 /// ```
596 pub fn fold_early_stop<A, I, F>(
597 self,
598 init: impl IntoQuotedMut<'a, I, L> + Copy,
599 f: impl IntoQuotedMut<'a, F, L> + Copy,
600 ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
601 where
602 K: Clone,
603 I: Fn() -> A + 'a,
604 F: Fn(&mut A, V) -> bool + 'a,
605 {
606 KeyedSingleton {
607 underlying: {
608 self.underlying
609 .assume_ordering::<TotalOrder>(
610 nondet!(/** keyed fold does not rely on order of keys */),
611 )
612 .fold_keyed_early_stop(init, f)
613 .into()
614 },
615 }
616 }
617
618 /// Like [`Stream::fold`], aggregates the values in each group via the `comb` closure.
619 ///
620 /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
621 /// to depend on the order of elements in the group.
622 ///
623 /// If the input and output value types are the same and do not require initialization then use
624 /// [`KeyedStream::reduce`].
625 ///
626 /// # Example
627 /// ```rust
628 /// # use hydro_lang::*;
629 /// # use futures::StreamExt;
630 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
631 /// let tick = process.tick();
632 /// let numbers = process
633 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
634 /// .into_keyed();
635 /// let batch = numbers.batch(&tick, nondet!(/** test */));
636 /// batch
637 /// .fold(q!(|| 0), q!(|acc, x| *acc += x))
638 /// .entries()
639 /// .all_ticks()
640 /// # }, |mut stream| async move {
641 /// // (1, 5), (2, 7)
642 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
643 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
644 /// # }));
645 /// ```
646 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
647 self,
648 init: impl IntoQuotedMut<'a, I, L>,
649 comb: impl IntoQuotedMut<'a, F, L>,
650 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
651 let init = init.splice_fn0_ctx(&self.underlying.location).into();
652 let comb = comb
653 .splice_fn2_borrow_mut_ctx(&self.underlying.location)
654 .into();
655
656 let out_ir = HydroNode::FoldKeyed {
657 init,
658 acc: comb,
659 input: Box::new(self.underlying.ir_node.into_inner()),
660 metadata: self.underlying.location.new_node_metadata::<(K, A)>(),
661 };
662
663 KeyedSingleton {
664 underlying: Stream::new(self.underlying.location, out_ir),
665 }
666 }
667
668 /// Like [`Stream::reduce`], aggregates the values in each group via the `comb` closure.
669 ///
670 /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
671 /// to depend on the order of elements in the stream.
672 ///
673 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
674 ///
675 /// # Example
676 /// ```rust
677 /// # use hydro_lang::*;
678 /// # use futures::StreamExt;
679 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
680 /// let tick = process.tick();
681 /// let numbers = process
682 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
683 /// .into_keyed();
684 /// let batch = numbers.batch(&tick, nondet!(/** test */));
685 /// batch.reduce(q!(|acc, x| *acc += x)).entries().all_ticks()
686 /// # }, |mut stream| async move {
687 /// // (1, 5), (2, 7)
688 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
689 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
690 /// # }));
691 /// ```
692 pub fn reduce<F: Fn(&mut V, V) + 'a>(
693 self,
694 comb: impl IntoQuotedMut<'a, F, L>,
695 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
696 let f = comb
697 .splice_fn2_borrow_mut_ctx(&self.underlying.location)
698 .into();
699
700 let out_ir = HydroNode::ReduceKeyed {
701 f,
702 input: Box::new(self.underlying.ir_node.into_inner()),
703 metadata: self.underlying.location.new_node_metadata::<(K, V)>(),
704 };
705
706 KeyedSingleton {
707 underlying: Stream::new(self.underlying.location, out_ir),
708 }
709 }
710
711 /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark are automatically deleted.
712 ///
713 /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
714 /// to depend on the order of elements in the stream.
715 ///
716 /// # Example
717 /// ```rust
718 /// # use hydro_lang::*;
719 /// # use futures::StreamExt;
720 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
721 /// let tick = process.tick();
722 /// let watermark = tick.singleton(q!(1));
723 /// let numbers = process
724 /// .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
725 /// .into_keyed();
726 /// let batch = numbers.batch(&tick, nondet!(/** test */));
727 /// batch
728 /// .reduce_watermark(watermark, q!(|acc, x| *acc += x))
729 /// .entries()
730 /// .all_ticks()
731 /// # }, |mut stream| async move {
732 /// // (2, 204)
733 /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
734 /// # }));
735 /// ```
736 pub fn reduce_watermark<O, F>(
737 self,
738 other: impl Into<Optional<O, Tick<L::Root>, Bounded>>,
739 comb: impl IntoQuotedMut<'a, F, L>,
740 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
741 where
742 O: Clone,
743 F: Fn(&mut V, V) + 'a,
744 {
745 let other: Optional<O, Tick<L::Root>, Bounded> = other.into();
746 check_matching_location(&self.underlying.location.root(), other.location.outer());
747 let f = comb
748 .splice_fn2_borrow_mut_ctx(&self.underlying.location)
749 .into();
750
751 let out_ir = Stream::new(
752 self.underlying.location.clone(),
753 HydroNode::ReduceKeyedWatermark {
754 f,
755 input: Box::new(self.underlying.ir_node.into_inner()),
756 watermark: Box::new(other.ir_node.into_inner()),
757 metadata: self.underlying.location.new_node_metadata::<(K, V)>(),
758 },
759 );
760
761 KeyedSingleton { underlying: out_ir }
762 }
763}
764
765impl<'a, K, V, L, B: Boundedness, O> KeyedStream<K, V, L, B, O, ExactlyOnce>
766where
767 K: Eq + Hash,
768 L: Location<'a>,
769{
770 /// Like [`Stream::fold_commutative`], aggregates the values in each group via the `comb` closure.
771 ///
772 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
773 ///
774 /// If the input and output value types are the same and do not require initialization then use
775 /// [`KeyedStream::reduce_commutative`].
776 ///
777 /// # Example
778 /// ```rust
779 /// # use hydro_lang::*;
780 /// # use futures::StreamExt;
781 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
782 /// let tick = process.tick();
783 /// let numbers = process
784 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
785 /// .into_keyed();
786 /// let batch = numbers.batch(&tick, nondet!(/** test */));
787 /// batch
788 /// .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
789 /// .entries()
790 /// .all_ticks()
791 /// # }, |mut stream| async move {
792 /// // (1, 5), (2, 7)
793 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
794 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
795 /// # }));
796 /// ```
797 pub fn fold_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
798 self,
799 init: impl IntoQuotedMut<'a, I, L>,
800 comb: impl IntoQuotedMut<'a, F, L>,
801 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
802 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
803 .fold(init, comb)
804 }
805
806 /// Like [`Stream::reduce_commutative`], aggregates the values in each group via the `comb` closure.
807 ///
808 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
809 ///
810 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative`].
811 ///
812 /// # Example
813 /// ```rust
814 /// # use hydro_lang::*;
815 /// # use futures::StreamExt;
816 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
817 /// let tick = process.tick();
818 /// let numbers = process
819 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
820 /// .into_keyed();
821 /// let batch = numbers.batch(&tick, nondet!(/** test */));
822 /// batch
823 /// .reduce_commutative(q!(|acc, x| *acc += x))
824 /// .entries()
825 /// .all_ticks()
826 /// # }, |mut stream| async move {
827 /// // (1, 5), (2, 7)
828 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
829 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
830 /// # }));
831 /// ```
832 pub fn reduce_commutative<F: Fn(&mut V, V) + 'a>(
833 self,
834 comb: impl IntoQuotedMut<'a, F, L>,
835 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
836 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
837 .reduce(comb)
838 }
839
840 /// A special case of [`KeyedStream::reduce_commutative`] where tuples with keys less than the watermark are automatically deleted.
841 ///
842 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
843 ///
844 /// # Example
845 /// ```rust
846 /// # use hydro_lang::*;
847 /// # use futures::StreamExt;
848 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
849 /// let tick = process.tick();
850 /// let watermark = tick.singleton(q!(1));
851 /// let numbers = process
852 /// .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
853 /// .into_keyed();
854 /// let batch = numbers.batch(&tick, nondet!(/** test */));
855 /// batch
856 /// .reduce_watermark_commutative(watermark, q!(|acc, x| *acc += x))
857 /// .entries()
858 /// .all_ticks()
859 /// # }, |mut stream| async move {
860 /// // (2, 204)
861 /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
862 /// # }));
863 /// ```
864 pub fn reduce_watermark_commutative<O2, F>(
865 self,
866 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
867 comb: impl IntoQuotedMut<'a, F, L>,
868 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
869 where
870 O2: Clone,
871 F: Fn(&mut V, V) + 'a,
872 {
873 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
874 .reduce_watermark(other, comb)
875 }
876}
877
878impl<'a, K, V, L, B: Boundedness, R> KeyedStream<K, V, L, B, TotalOrder, R>
879where
880 K: Eq + Hash,
881 L: Location<'a>,
882{
883 /// Like [`Stream::fold_idempotent`], aggregates the values in each group via the `comb` closure.
884 ///
885 /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
886 ///
887 /// If the input and output value types are the same and do not require initialization then use
888 /// [`KeyedStream::reduce_idempotent`].
889 ///
890 /// # Example
891 /// ```rust
892 /// # use hydro_lang::*;
893 /// # use futures::StreamExt;
894 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
895 /// let tick = process.tick();
896 /// let numbers = process
897 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
898 /// .into_keyed();
899 /// let batch = numbers.batch(&tick, nondet!(/** test */));
900 /// batch
901 /// .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
902 /// .entries()
903 /// .all_ticks()
904 /// # }, |mut stream| async move {
905 /// // (1, false), (2, true)
906 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
907 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
908 /// # }));
909 /// ```
910 pub fn fold_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
911 self,
912 init: impl IntoQuotedMut<'a, I, L>,
913 comb: impl IntoQuotedMut<'a, F, L>,
914 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
915 self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
916 .fold(init, comb)
917 }
918
919 /// Like [`Stream::reduce_idempotent`], aggregates the values in each group via the `comb` closure.
920 ///
921 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
922 ///
923 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_idempotent`].
924 ///
925 /// # Example
926 /// ```rust
927 /// # use hydro_lang::*;
928 /// # use futures::StreamExt;
929 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
930 /// let tick = process.tick();
931 /// let numbers = process
932 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
933 /// .into_keyed();
934 /// let batch = numbers.batch(&tick, nondet!(/** test */));
935 /// batch
936 /// .reduce_idempotent(q!(|acc, x| *acc |= x))
937 /// .entries()
938 /// .all_ticks()
939 /// # }, |mut stream| async move {
940 /// // (1, false), (2, true)
941 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
942 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
943 /// # }));
944 /// ```
945 pub fn reduce_idempotent<F: Fn(&mut V, V) + 'a>(
946 self,
947 comb: impl IntoQuotedMut<'a, F, L>,
948 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
949 self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
950 .reduce(comb)
951 }
952
953 /// A special case of [`KeyedStream::reduce_idempotent`] where tuples with keys less than the watermark are automatically deleted.
954 ///
955 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
956 ///
957 /// # Example
958 /// ```rust
959 /// # use hydro_lang::*;
960 /// # use futures::StreamExt;
961 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
962 /// let tick = process.tick();
963 /// let watermark = tick.singleton(q!(1));
964 /// let numbers = process
965 /// .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
966 /// .into_keyed();
967 /// let batch = numbers.batch(&tick, nondet!(/** test */));
968 /// batch
969 /// .reduce_watermark_idempotent(watermark, q!(|acc, x| *acc |= x))
970 /// .entries()
971 /// .all_ticks()
972 /// # }, |mut stream| async move {
973 /// // (2, true)
974 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
975 /// # }));
976 /// ```
977 pub fn reduce_watermark_idempotent<O2, F>(
978 self,
979 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
980 comb: impl IntoQuotedMut<'a, F, L>,
981 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
982 where
983 O2: Clone,
984 F: Fn(&mut V, V) + 'a,
985 {
986 self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
987 .reduce_watermark(other, comb)
988 }
989}
990
991impl<'a, K, V, L, B: Boundedness, O, R> KeyedStream<K, V, L, B, O, R>
992where
993 K: Eq + Hash,
994 L: Location<'a>,
995{
996 /// Like [`Stream::fold_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
997 ///
998 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
999 /// as there may be non-deterministic duplicates.
1000 ///
1001 /// If the input and output value types are the same and do not require initialization then use
1002 /// [`KeyedStream::reduce_commutative_idempotent`].
1003 ///
1004 /// # Example
1005 /// ```rust
1006 /// # use hydro_lang::*;
1007 /// # use futures::StreamExt;
1008 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1009 /// let tick = process.tick();
1010 /// let numbers = process
1011 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1012 /// .into_keyed();
1013 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1014 /// batch
1015 /// .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1016 /// .entries()
1017 /// .all_ticks()
1018 /// # }, |mut stream| async move {
1019 /// // (1, false), (2, true)
1020 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1021 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1022 /// # }));
1023 /// ```
1024 pub fn fold_commutative_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1025 self,
1026 init: impl IntoQuotedMut<'a, I, L>,
1027 comb: impl IntoQuotedMut<'a, F, L>,
1028 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1029 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1030 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1031 .fold(init, comb)
1032 }
1033
1034 /// Like [`Stream::reduce_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1035 ///
1036 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1037 /// as there may be non-deterministic duplicates.
1038 ///
1039 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative_idempotent`].
1040 ///
1041 /// # Example
1042 /// ```rust
1043 /// # use hydro_lang::*;
1044 /// # use futures::StreamExt;
1045 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1046 /// let tick = process.tick();
1047 /// let numbers = process
1048 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1049 /// .into_keyed();
1050 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1051 /// batch
1052 /// .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1053 /// .entries()
1054 /// .all_ticks()
1055 /// # }, |mut stream| async move {
1056 /// // (1, false), (2, true)
1057 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1058 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1059 /// # }));
1060 /// ```
1061 pub fn reduce_commutative_idempotent<F: Fn(&mut V, V) + 'a>(
1062 self,
1063 comb: impl IntoQuotedMut<'a, F, L>,
1064 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1065 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1066 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1067 .reduce(comb)
1068 }
1069
1070 /// A special case of [`Stream::reduce_keyed_commutative_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1071 ///
1072 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1073 /// as there may be non-deterministic duplicates.
1074 ///
1075 /// # Example
1076 /// ```rust
1077 /// # use hydro_lang::*;
1078 /// # use futures::StreamExt;
1079 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1080 /// let tick = process.tick();
1081 /// let watermark = tick.singleton(q!(1));
1082 /// let numbers = process
1083 /// .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1084 /// .into_keyed();
1085 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1086 /// batch
1087 /// .reduce_watermark_commutative_idempotent(watermark, q!(|acc, x| *acc |= x))
1088 /// .entries()
1089 /// .all_ticks()
1090 /// # }, |mut stream| async move {
1091 /// // (2, true)
1092 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1093 /// # }));
1094 /// ```
1095 pub fn reduce_watermark_commutative_idempotent<O2, F>(
1096 self,
1097 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1098 comb: impl IntoQuotedMut<'a, F, L>,
1099 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1100 where
1101 O2: Clone,
1102 F: Fn(&mut V, V) + 'a,
1103 {
1104 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1105 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1106 .reduce_watermark(other, comb)
1107 }
1108
1109 /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1110 /// whose keys are not in the bounded stream.
1111 ///
1112 /// # Example
1113 /// ```rust
1114 /// # use hydro_lang::*;
1115 /// # use futures::StreamExt;
1116 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1117 /// let tick = process.tick();
1118 /// let keyed_stream = process
1119 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1120 /// .batch(&tick, nondet!(/** test */))
1121 /// .into_keyed();
1122 /// let keys_to_remove = process
1123 /// .source_iter(q!(vec![1, 2]))
1124 /// .batch(&tick, nondet!(/** test */));
1125 /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1126 /// # .entries()
1127 /// # }, |mut stream| async move {
1128 /// // { 3: ['c'], 4: ['d'] }
1129 /// # for w in vec![(3, 'c'), (4, 'd')] {
1130 /// # assert_eq!(stream.next().await.unwrap(), w);
1131 /// # }
1132 /// # }));
1133 pub fn filter_key_not_in<O2, R2>(self, other: Stream<K, L, Bounded, O2, R2>) -> Self {
1134 KeyedStream {
1135 underlying: self.entries().anti_join(other),
1136 _phantom_order: Default::default(),
1137 }
1138 }
1139}
1140
1141impl<'a, K, V, L, B: Boundedness, O, R> KeyedStream<K, V, L, B, O, R>
1142where
1143 L: Location<'a> + NoTick + NoAtomic,
1144{
1145 pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
1146 KeyedStream {
1147 underlying: self.underlying.atomic(tick),
1148 _phantom_order: Default::default(),
1149 }
1150 }
1151
1152 /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
1153 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1154 /// the order of the input.
1155 ///
1156 /// # Non-Determinism
1157 /// The batch boundaries are non-deterministic and may change across executions.
1158 pub fn batch(
1159 self,
1160 tick: &Tick<L>,
1161 nondet: NonDet,
1162 ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1163 self.atomic(tick).batch(nondet)
1164 }
1165}
1166
1167impl<'a, K, V, L, B: Boundedness, O, R> KeyedStream<K, V, Atomic<L>, B, O, R>
1168where
1169 L: Location<'a> + NoTick + NoAtomic,
1170{
1171 /// Returns a keyed stream corresponding to the latest batch of elements being atomically
1172 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
1173 /// the order of the input.
1174 ///
1175 /// # Non-Determinism
1176 /// The batch boundaries are non-deterministic and may change across executions.
1177 pub fn batch(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1178 KeyedStream {
1179 underlying: self.underlying.batch(nondet),
1180 _phantom_order: Default::default(),
1181 }
1182 }
1183}
1184
1185impl<'a, K, V, L, O, R> KeyedStream<K, V, L, Bounded, O, R>
1186where
1187 L: Location<'a>,
1188{
1189 pub fn chain<O2>(
1190 self,
1191 other: KeyedStream<K, V, L, Bounded, O2, R>,
1192 ) -> KeyedStream<K, V, L, Bounded, O::Min, R>
1193 where
1194 O: MinOrder<O2>,
1195 {
1196 KeyedStream {
1197 underlying: self.underlying.chain(other.underlying),
1198 _phantom_order: Default::default(),
1199 }
1200 }
1201}
1202
1203impl<'a, K, V, L, O, R> KeyedStream<K, V, Tick<L>, Bounded, O, R>
1204where
1205 L: Location<'a>,
1206{
1207 pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
1208 KeyedStream {
1209 underlying: self.underlying.all_ticks(),
1210 _phantom_order: Default::default(),
1211 }
1212 }
1213}
1214
1215#[cfg(test)]
1216mod tests {
1217 use futures::{SinkExt, StreamExt};
1218 use hydro_deploy::Deployment;
1219 use stageleft::q;
1220
1221 use crate::location::Location;
1222 use crate::{FlowBuilder, nondet};
1223
1224 #[tokio::test]
1225 async fn reduce_watermark_filter() {
1226 let mut deployment = Deployment::new();
1227
1228 let flow = FlowBuilder::new();
1229 let node = flow.process::<()>();
1230 let external = flow.external::<()>();
1231
1232 let node_tick = node.tick();
1233 let watermark = node_tick.singleton(q!(1));
1234
1235 let sum = node
1236 .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1237 .into_keyed()
1238 .reduce_watermark(
1239 watermark,
1240 q!(|acc, v| {
1241 *acc += v;
1242 }),
1243 )
1244 .snapshot(&node_tick, nondet!(/** test */))
1245 .entries()
1246 .all_ticks()
1247 .send_bincode_external(&external);
1248
1249 let nodes = flow
1250 .with_process(&node, deployment.Localhost())
1251 .with_external(&external, deployment.Localhost())
1252 .deploy(&mut deployment);
1253
1254 deployment.deploy().await.unwrap();
1255
1256 let mut out = nodes.connect_source_bincode(sum).await;
1257
1258 deployment.start().await.unwrap();
1259
1260 assert_eq!(out.next().await.unwrap(), (2, 204));
1261 }
1262
1263 #[tokio::test]
1264 async fn reduce_watermark_garbage_collect() {
1265 let mut deployment = Deployment::new();
1266
1267 let flow = FlowBuilder::new();
1268 let node = flow.process::<()>();
1269 let external = flow.external::<()>();
1270 let (tick_send, tick_trigger) = node.source_external_bincode(&external);
1271
1272 let node_tick = node.tick();
1273 let (watermark_complete_cycle, watermark) =
1274 node_tick.cycle_with_initial(node_tick.singleton(q!(1)));
1275 let next_watermark = watermark.clone().map(q!(|v| v + 1));
1276 watermark_complete_cycle.complete_next_tick(next_watermark);
1277
1278 let tick_triggered_input = node
1279 .source_iter(q!([(3, 103)]))
1280 .batch(&node_tick, nondet!(/** test */))
1281 .continue_if(
1282 tick_trigger
1283 .clone()
1284 .batch(&node_tick, nondet!(/** test */))
1285 .first(),
1286 )
1287 .all_ticks();
1288
1289 let sum = node
1290 .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1291 .interleave(tick_triggered_input)
1292 .into_keyed()
1293 .reduce_watermark_commutative(
1294 watermark,
1295 q!(|acc, v| {
1296 *acc += v;
1297 }),
1298 )
1299 .snapshot(&node_tick, nondet!(/** test */))
1300 .entries()
1301 .all_ticks()
1302 .send_bincode_external(&external);
1303
1304 let nodes = flow
1305 .with_default_optimize()
1306 .with_process(&node, deployment.Localhost())
1307 .with_external(&external, deployment.Localhost())
1308 .deploy(&mut deployment);
1309
1310 deployment.deploy().await.unwrap();
1311
1312 let mut tick_send = nodes.connect_sink_bincode(tick_send).await;
1313 let mut out_recv = nodes.connect_source_bincode(sum).await;
1314
1315 deployment.start().await.unwrap();
1316
1317 assert_eq!(out_recv.next().await.unwrap(), (2, 204));
1318
1319 tick_send.send(()).await.unwrap();
1320
1321 assert_eq!(out_recv.next().await.unwrap(), (3, 103));
1322 }
1323}