hydro_lang/live_collections/keyed_stream/mod.rs
1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::collections::HashMap;
4use std::hash::Hash;
5use std::marker::PhantomData;
6
7use stageleft::{IntoQuotedMut, QuotedWithContext, q};
8
9use super::boundedness::{Bounded, Boundedness, Unbounded};
10use super::keyed_singleton::KeyedSingleton;
11use super::optional::Optional;
12use super::stream::{ExactlyOnce, MinOrder, MinRetries, NoOrder, Stream, TotalOrder};
13use crate::compile::ir::HydroNode;
14use crate::forward_handle::ForwardRef;
15#[cfg(stageleft_runtime)]
16use crate::forward_handle::{CycleCollection, ReceiverComplete};
17use crate::live_collections::stream::{Ordering, Retries};
18use crate::location::dynamic::LocationId;
19use crate::location::tick::NoAtomic;
20use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
21use crate::manual_expr::ManualExpr;
22use crate::nondet::{NonDet, nondet};
23
24pub mod networking;
25
26/// Streaming elements of type `V` grouped by a key of type `K`.
27///
28/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
29/// order of keys is non-deterministic but the order *within* each group may be deterministic.
30///
31/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
32/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
33/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
34///
35/// Type Parameters:
36/// - `K`: the type of the key for each group
37/// - `V`: the type of the elements inside each group
38/// - `Loc`: the [`Location`] where the keyed stream is materialized
39/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
40/// - `Order`: tracks whether the elements within each group have deterministic order
41/// ([`TotalOrder`]) or not ([`NoOrder`])
42/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
43/// ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
44pub struct KeyedStream<
45 K,
46 V,
47 Loc,
48 Bound: Boundedness,
49 Order: Ordering = TotalOrder,
50 Retry: Retries = ExactlyOnce,
51> {
52 pub(crate) underlying: Stream<(K, V), Loc, Bound, NoOrder, Retry>,
53 pub(crate) _phantom_order: PhantomData<Order>,
54}
55
56impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
57 for KeyedStream<K, V, L, B, NoOrder, R>
58where
59 L: Location<'a>,
60{
61 fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
62 KeyedStream {
63 underlying: stream.underlying,
64 _phantom_order: Default::default(),
65 }
66 }
67}
68
69impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
70 Clone for KeyedStream<K, V, Loc, Bound, Order, R>
71{
72 fn clone(&self) -> Self {
73 KeyedStream {
74 underlying: self.underlying.clone(),
75 _phantom_order: PhantomData,
76 }
77 }
78}
79
80impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
81 for KeyedStream<K, V, L, B, O, R>
82where
83 L: Location<'a> + NoTick,
84{
85 type Location = L;
86
87 fn create_source(ident: syn::Ident, location: L) -> Self {
88 Stream::create_source(ident, location).into_keyed()
89 }
90}
91
92impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
93 for KeyedStream<K, V, L, B, O, R>
94where
95 L: Location<'a> + NoTick,
96{
97 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
98 self.underlying.complete(ident, expected_location);
99 }
100}
101
102impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
103 KeyedStream<K, V, L, B, O, R>
104{
105 /// Explicitly "casts" the keyed stream to a type with a different ordering
106 /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
107 /// by the type-system.
108 ///
109 /// # Non-Determinism
110 /// This function is used as an escape hatch, and any mistakes in the
111 /// provided ordering guarantee will propagate into the guarantees
112 /// for the rest of the program.
113 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
114 KeyedStream {
115 underlying: self.underlying,
116 _phantom_order: PhantomData,
117 }
118 }
119
120 /// Explicitly "casts" the keyed stream to a type with a different retries
121 /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
122 /// be proven by the type-system.
123 ///
124 /// # Non-Determinism
125 /// This function is used as an escape hatch, and any mistakes in the
126 /// provided retries guarantee will propagate into the guarantees
127 /// for the rest of the program.
128 pub fn assume_retries<R2: Retries>(self, nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
129 KeyedStream {
130 underlying: self.underlying.assume_retries::<R2>(nondet),
131 _phantom_order: PhantomData,
132 }
133 }
134
135 /// Flattens the keyed stream into an unordered stream of key-value pairs.
136 ///
137 /// # Example
138 /// ```rust
139 /// # use hydro_lang::prelude::*;
140 /// # use futures::StreamExt;
141 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
142 /// process
143 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
144 /// .into_keyed()
145 /// .entries()
146 /// # }, |mut stream| async move {
147 /// // (1, 2), (1, 3), (2, 4) in any order
148 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
149 /// # assert_eq!(stream.next().await.unwrap(), w);
150 /// # }
151 /// # }));
152 /// ```
153 pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
154 self.underlying
155 }
156
157 /// Flattens the keyed stream into an unordered stream of only the values.
158 ///
159 /// # Example
160 /// ```rust
161 /// # use hydro_lang::prelude::*;
162 /// # use futures::StreamExt;
163 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
164 /// process
165 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
166 /// .into_keyed()
167 /// .values()
168 /// # }, |mut stream| async move {
169 /// // 2, 3, 4 in any order
170 /// # for w in vec![2, 3, 4] {
171 /// # assert_eq!(stream.next().await.unwrap(), w);
172 /// # }
173 /// # }));
174 /// ```
175 pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
176 self.underlying.map(q!(|(_, v)| v))
177 }
178
179 /// Transforms each value by invoking `f` on each element, with keys staying the same
180 /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
181 ///
182 /// If you do not want to modify the stream and instead only want to view
183 /// each item use [`KeyedStream::inspect`] instead.
184 ///
185 /// # Example
186 /// ```rust
187 /// # use hydro_lang::prelude::*;
188 /// # use futures::StreamExt;
189 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
190 /// process
191 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
192 /// .into_keyed()
193 /// .map(q!(|v| v + 1))
194 /// # .entries()
195 /// # }, |mut stream| async move {
196 /// // { 1: [3, 4], 2: [5] }
197 /// # for w in vec![(1, 3), (1, 4), (2, 5)] {
198 /// # assert_eq!(stream.next().await.unwrap(), w);
199 /// # }
200 /// # }));
201 /// ```
202 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
203 where
204 F: Fn(V) -> U + 'a,
205 {
206 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
207 KeyedStream {
208 underlying: self.underlying.map(q!({
209 let orig = f;
210 move |(k, v)| (k, orig(v))
211 })),
212 _phantom_order: Default::default(),
213 }
214 }
215
216 /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
217 /// re-grouped even they are tuples; instead they will be grouped under the original key.
218 ///
219 /// If you do not want to modify the stream and instead only want to view
220 /// each item use [`KeyedStream::inspect_with_key`] instead.
221 ///
222 /// # Example
223 /// ```rust
224 /// # use hydro_lang::prelude::*;
225 /// # use futures::StreamExt;
226 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
227 /// process
228 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
229 /// .into_keyed()
230 /// .map_with_key(q!(|(k, v)| k + v))
231 /// # .entries()
232 /// # }, |mut stream| async move {
233 /// // { 1: [3, 4], 2: [6] }
234 /// # for w in vec![(1, 3), (1, 4), (2, 6)] {
235 /// # assert_eq!(stream.next().await.unwrap(), w);
236 /// # }
237 /// # }));
238 /// ```
239 pub fn map_with_key<U, F>(
240 self,
241 f: impl IntoQuotedMut<'a, F, L> + Copy,
242 ) -> KeyedStream<K, U, L, B, O, R>
243 where
244 F: Fn((K, V)) -> U + 'a,
245 K: Clone,
246 {
247 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
248 KeyedStream {
249 underlying: self.underlying.map(q!({
250 let orig = f;
251 move |(k, v)| {
252 let out = orig((k.clone(), v));
253 (k, out)
254 }
255 })),
256 _phantom_order: Default::default(),
257 }
258 }
259
260 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
261 /// `f`, preserving the order of the elements within the group.
262 ///
263 /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
264 /// not modify or take ownership of the values. If you need to modify the values while filtering
265 /// use [`KeyedStream::filter_map`] instead.
266 ///
267 /// # Example
268 /// ```rust
269 /// # use hydro_lang::prelude::*;
270 /// # use futures::StreamExt;
271 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
272 /// process
273 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
274 /// .into_keyed()
275 /// .filter(q!(|&x| x > 2))
276 /// # .entries()
277 /// # }, |mut stream| async move {
278 /// // { 1: [3], 2: [4] }
279 /// # for w in vec![(1, 3), (2, 4)] {
280 /// # assert_eq!(stream.next().await.unwrap(), w);
281 /// # }
282 /// # }));
283 /// ```
284 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
285 where
286 F: Fn(&V) -> bool + 'a,
287 {
288 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
289 KeyedStream {
290 underlying: self.underlying.filter(q!({
291 let orig = f;
292 move |(_k, v)| orig(v)
293 })),
294 _phantom_order: Default::default(),
295 }
296 }
297
298 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
299 /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
300 ///
301 /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
302 /// not modify or take ownership of the values. If you need to modify the values while filtering
303 /// use [`KeyedStream::filter_map_with_key`] instead.
304 ///
305 /// # Example
306 /// ```rust
307 /// # use hydro_lang::prelude::*;
308 /// # use futures::StreamExt;
309 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
310 /// process
311 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
312 /// .into_keyed()
313 /// .filter_with_key(q!(|&(k, v)| v - k == 2))
314 /// # .entries()
315 /// # }, |mut stream| async move {
316 /// // { 1: [3], 2: [4] }
317 /// # for w in vec![(1, 3), (2, 4)] {
318 /// # assert_eq!(stream.next().await.unwrap(), w);
319 /// # }
320 /// # }));
321 /// ```
322 pub fn filter_with_key<F>(
323 self,
324 f: impl IntoQuotedMut<'a, F, L> + Copy,
325 ) -> KeyedStream<K, V, L, B, O, R>
326 where
327 F: Fn(&(K, V)) -> bool + 'a,
328 {
329 KeyedStream {
330 underlying: self.underlying.filter(f),
331 _phantom_order: Default::default(),
332 }
333 }
334
335 /// An operator that both filters and maps each value, with keys staying the same.
336 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
337 /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
338 ///
339 /// # Example
340 /// ```rust
341 /// # use hydro_lang::prelude::*;
342 /// # use futures::StreamExt;
343 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
344 /// process
345 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
346 /// .into_keyed()
347 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
348 /// # .entries()
349 /// # }, |mut stream| async move {
350 /// // { 1: [2], 2: [4] }
351 /// # for w in vec![(1, 2), (2, 4)] {
352 /// # assert_eq!(stream.next().await.unwrap(), w);
353 /// # }
354 /// # }));
355 /// ```
356 pub fn filter_map<U, F>(
357 self,
358 f: impl IntoQuotedMut<'a, F, L> + Copy,
359 ) -> KeyedStream<K, U, L, B, O, R>
360 where
361 F: Fn(V) -> Option<U> + 'a,
362 {
363 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
364 KeyedStream {
365 underlying: self.underlying.filter_map(q!({
366 let orig = f;
367 move |(k, v)| orig(v).map(|o| (k, o))
368 })),
369 _phantom_order: Default::default(),
370 }
371 }
372
373 /// An operator that both filters and maps each key-value pair. The resulting values are **not**
374 /// re-grouped even they are tuples; instead they will be grouped under the original key.
375 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
376 ///
377 /// # Example
378 /// ```rust
379 /// # use hydro_lang::prelude::*;
380 /// # use futures::StreamExt;
381 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
382 /// process
383 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
384 /// .into_keyed()
385 /// .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
386 /// # .entries()
387 /// # }, |mut stream| async move {
388 /// // { 2: [2] }
389 /// # for w in vec![(2, 2)] {
390 /// # assert_eq!(stream.next().await.unwrap(), w);
391 /// # }
392 /// # }));
393 /// ```
394 pub fn filter_map_with_key<U, F>(
395 self,
396 f: impl IntoQuotedMut<'a, F, L> + Copy,
397 ) -> KeyedStream<K, U, L, B, O, R>
398 where
399 F: Fn((K, V)) -> Option<U> + 'a,
400 K: Clone,
401 {
402 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
403 KeyedStream {
404 underlying: self.underlying.filter_map(q!({
405 let orig = f;
406 move |(k, v)| {
407 let out = orig((k.clone(), v));
408 out.map(|o| (k, o))
409 }
410 })),
411 _phantom_order: Default::default(),
412 }
413 }
414
415 /// For each value `v` in each group, transform `v` using `f` and then treat the
416 /// result as an [`Iterator`] to produce values one by one within the same group.
417 /// The implementation for [`Iterator`] for the output type `I` must produce items
418 /// in a **deterministic** order.
419 ///
420 /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
421 /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
422 ///
423 /// # Example
424 /// ```rust
425 /// # use hydro_lang::prelude::*;
426 /// # use futures::StreamExt;
427 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
428 /// process
429 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
430 /// .into_keyed()
431 /// .flat_map_ordered(q!(|x| x))
432 /// # .entries()
433 /// # }, |mut stream| async move {
434 /// // { 1: [2, 3, 4], 2: [5, 6] }
435 /// # for w in vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)] {
436 /// # assert_eq!(stream.next().await.unwrap(), w);
437 /// # }
438 /// # }));
439 /// ```
440 pub fn flat_map_ordered<U, I, F>(
441 self,
442 f: impl IntoQuotedMut<'a, F, L> + Copy,
443 ) -> KeyedStream<K, U, L, B, O, R>
444 where
445 I: IntoIterator<Item = U>,
446 F: Fn(V) -> I + 'a,
447 K: Clone,
448 {
449 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
450 KeyedStream {
451 underlying: self.underlying.flat_map_ordered(q!({
452 let orig = f;
453 move |(k, v)| orig(v).into_iter().map(move |u| (k.clone(), u))
454 })),
455 _phantom_order: Default::default(),
456 }
457 }
458
459 /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
460 /// for the output type `I` to produce items in any order.
461 ///
462 /// # Example
463 /// ```rust
464 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
465 /// # use futures::StreamExt;
466 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
467 /// process
468 /// .source_iter(q!(vec![
469 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
470 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
471 /// ]))
472 /// .into_keyed()
473 /// .flat_map_unordered(q!(|x| x))
474 /// # .entries()
475 /// # }, |mut stream| async move {
476 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
477 /// # let mut results = Vec::new();
478 /// # for _ in 0..4 {
479 /// # results.push(stream.next().await.unwrap());
480 /// # }
481 /// # results.sort();
482 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
483 /// # }));
484 /// ```
485 pub fn flat_map_unordered<U, I, F>(
486 self,
487 f: impl IntoQuotedMut<'a, F, L> + Copy,
488 ) -> KeyedStream<K, U, L, B, NoOrder, R>
489 where
490 I: IntoIterator<Item = U>,
491 F: Fn(V) -> I + 'a,
492 K: Clone,
493 {
494 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
495 KeyedStream {
496 underlying: self.underlying.flat_map_unordered(q!({
497 let orig = f;
498 move |(k, v)| orig(v).into_iter().map(move |u| (k.clone(), u))
499 })),
500 _phantom_order: Default::default(),
501 }
502 }
503
504 /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
505 /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
506 /// items in a **deterministic** order.
507 ///
508 /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
509 /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
510 ///
511 /// # Example
512 /// ```rust
513 /// # use hydro_lang::prelude::*;
514 /// # use futures::StreamExt;
515 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
516 /// process
517 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
518 /// .into_keyed()
519 /// .flatten_ordered()
520 /// # .entries()
521 /// # }, |mut stream| async move {
522 /// // { 1: [2, 3, 4], 2: [5, 6] }
523 /// # for w in vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)] {
524 /// # assert_eq!(stream.next().await.unwrap(), w);
525 /// # }
526 /// # }));
527 /// ```
528 pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
529 where
530 V: IntoIterator<Item = U>,
531 K: Clone,
532 {
533 self.flat_map_ordered(q!(|d| d))
534 }
535
536 /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
537 /// for the value type `V` to produce items in any order.
538 ///
539 /// # Example
540 /// ```rust
541 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
542 /// # use futures::StreamExt;
543 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
544 /// process
545 /// .source_iter(q!(vec![
546 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
547 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
548 /// ]))
549 /// .into_keyed()
550 /// .flatten_unordered()
551 /// # .entries()
552 /// # }, |mut stream| async move {
553 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
554 /// # let mut results = Vec::new();
555 /// # for _ in 0..4 {
556 /// # results.push(stream.next().await.unwrap());
557 /// # }
558 /// # results.sort();
559 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
560 /// # }));
561 /// ```
562 pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
563 where
564 V: IntoIterator<Item = U>,
565 K: Clone,
566 {
567 self.flat_map_unordered(q!(|d| d))
568 }
569
570 /// An operator which allows you to "inspect" each element of a stream without
571 /// modifying it. The closure `f` is called on a reference to each value. This is
572 /// mainly useful for debugging, and should not be used to generate side-effects.
573 ///
574 /// # Example
575 /// ```rust
576 /// # use hydro_lang::prelude::*;
577 /// # use futures::StreamExt;
578 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
579 /// process
580 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
581 /// .into_keyed()
582 /// .inspect(q!(|v| println!("{}", v)))
583 /// # .entries()
584 /// # }, |mut stream| async move {
585 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
586 /// # assert_eq!(stream.next().await.unwrap(), w);
587 /// # }
588 /// # }));
589 /// ```
590 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
591 where
592 F: Fn(&V) + 'a,
593 {
594 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
595 KeyedStream {
596 underlying: self.underlying.inspect(q!({
597 let orig = f;
598 move |(_k, v)| orig(v)
599 })),
600 _phantom_order: Default::default(),
601 }
602 }
603
604 /// An operator which allows you to "inspect" each element of a stream without
605 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
606 /// mainly useful for debugging, and should not be used to generate side-effects.
607 ///
608 /// # Example
609 /// ```rust
610 /// # use hydro_lang::prelude::*;
611 /// # use futures::StreamExt;
612 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
613 /// process
614 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
615 /// .into_keyed()
616 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
617 /// # .entries()
618 /// # }, |mut stream| async move {
619 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
620 /// # assert_eq!(stream.next().await.unwrap(), w);
621 /// # }
622 /// # }));
623 /// ```
624 pub fn inspect_with_key<F>(
625 self,
626 f: impl IntoQuotedMut<'a, F, L>,
627 ) -> KeyedStream<K, V, L, B, O, R>
628 where
629 F: Fn(&(K, V)) + 'a,
630 {
631 KeyedStream {
632 underlying: self.underlying.inspect(f),
633 _phantom_order: Default::default(),
634 }
635 }
636
637 /// An operator which allows you to "name" a `HydroNode`.
638 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
639 pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
640 {
641 let mut node = self.underlying.ir_node.borrow_mut();
642 let metadata = node.metadata_mut();
643 metadata.tag = Some(name.to_string());
644 }
645 self
646 }
647}
648
649impl<'a, K, V, L: Location<'a> + NoTick + NoAtomic, O: Ordering, R: Retries>
650 KeyedStream<K, V, L, Unbounded, O, R>
651{
652 /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
653 /// of any overlapping groups. The result has [`NoOrder`] on each group because the
654 /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
655 /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
656 ///
657 /// Currently, both input streams must be [`Unbounded`].
658 ///
659 /// # Example
660 /// ```rust
661 /// # use hydro_lang::prelude::*;
662 /// # use futures::StreamExt;
663 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
664 /// let numbers1 = process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed();
665 /// let numbers2 = process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed();
666 /// numbers1.interleave(numbers2)
667 /// # .entries()
668 /// # }, |mut stream| async move {
669 /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
670 /// # for w in vec![(1, 2), (3, 4), (1, 3), (3, 5)] {
671 /// # assert_eq!(stream.next().await.unwrap(), w);
672 /// # }
673 /// # }));
674 /// ```
675 pub fn interleave<O2: Ordering, R2: Retries>(
676 self,
677 other: KeyedStream<K, V, L, Unbounded, O2, R2>,
678 ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
679 where
680 R: MinRetries<R2>,
681 {
682 self.entries().interleave(other.entries()).into_keyed()
683 }
684}
685
686/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
687/// control the processing of future elements.
688pub enum Generate<T> {
689 /// Emit the provided element, and keep processing future inputs.
690 Yield(T),
691 /// Emit the provided element as the _final_ element, do not process future inputs.
692 Return(T),
693 /// Do not emit anything, but continue processing future inputs.
694 Continue,
695 /// Do not emit anything, and do not process further inputs.
696 Break,
697}
698
699impl<'a, K, V, L, B: Boundedness> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
700where
701 K: Eq + Hash,
702 L: Location<'a>,
703{
704 /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
705 ///
706 /// Unlike [`Stream::fold_keyed`] which only returns the final accumulated value, `scan` produces a new stream
707 /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
708 /// early by returning `None`.
709 ///
710 /// The function takes a mutable reference to the accumulator and the current element, and returns
711 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
712 /// If the function returns `None`, the stream is terminated and no more elements are processed.
713 ///
714 /// # Example
715 /// ```rust
716 /// # use hydro_lang::prelude::*;
717 /// # use futures::StreamExt;
718 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
719 /// process
720 /// .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
721 /// .into_keyed()
722 /// .scan(
723 /// q!(|| 0),
724 /// q!(|acc, x| {
725 /// *acc += x;
726 /// if *acc % 2 == 0 { None } else { Some(*acc) }
727 /// }),
728 /// )
729 /// # .entries()
730 /// # }, |mut stream| async move {
731 /// // Output: { 0: [1], 1: [3, 7] }
732 /// # for w in vec![(0, 1), (1, 3), (1, 7)] {
733 /// # assert_eq!(stream.next().await.unwrap(), w);
734 /// # }
735 /// # }));
736 /// ```
737 pub fn scan<A, U, I, F>(
738 self,
739 init: impl IntoQuotedMut<'a, I, L> + Copy,
740 f: impl IntoQuotedMut<'a, F, L> + Copy,
741 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
742 where
743 K: Clone,
744 I: Fn() -> A + 'a,
745 F: Fn(&mut A, V) -> Option<U> + 'a,
746 {
747 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
748 self.generator(
749 init,
750 q!({
751 let orig = f;
752 move |state, v| {
753 if let Some(out) = orig(state, v) {
754 Generate::Yield(out)
755 } else {
756 Generate::Break
757 }
758 }
759 }),
760 )
761 }
762
763 /// Iteratively processes the elements in each group using a state machine that can yield
764 /// elements as it processes its inputs. This is designed to mirror the unstable generator
765 /// syntax in Rust, without requiring special syntax.
766 ///
767 /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
768 /// state for each group. The second argument defines the processing logic, taking in a
769 /// mutable reference to the group's state and the value to be processed. It emits a
770 /// [`Generate`] value, whose variants define what is emitted and whether further inputs
771 /// should be processed.
772 ///
773 /// # Example
774 /// ```rust
775 /// # use hydro_lang::prelude::*;
776 /// # use futures::StreamExt;
777 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
778 /// process
779 /// .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
780 /// .into_keyed()
781 /// .generator(
782 /// q!(|| 0),
783 /// q!(|acc, x| {
784 /// *acc += x;
785 /// if *acc > 100 {
786 /// hydro_lang::live_collections::keyed_stream::Generate::Return(
787 /// "done!".to_string()
788 /// )
789 /// } else if *acc % 2 == 0 {
790 /// hydro_lang::live_collections::keyed_stream::Generate::Yield(
791 /// "even".to_string()
792 /// )
793 /// } else {
794 /// hydro_lang::live_collections::keyed_stream::Generate::Continue
795 /// }
796 /// }),
797 /// )
798 /// # .entries()
799 /// # }, |mut stream| async move {
800 /// // Output: { 0: ["even", "done!"], 1: ["even"] }
801 /// # for w in vec![(0, "even".to_string()), (0, "done!".to_string()), (1, "even".to_string())] {
802 /// # assert_eq!(stream.next().await.unwrap(), w);
803 /// # }
804 /// # }));
805 /// ```
806 pub fn generator<A, U, I, F>(
807 self,
808 init: impl IntoQuotedMut<'a, I, L> + Copy,
809 f: impl IntoQuotedMut<'a, F, L> + Copy,
810 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
811 where
812 K: Clone,
813 I: Fn() -> A + 'a,
814 F: Fn(&mut A, V) -> Generate<U> + 'a,
815 {
816 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
817 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
818 let underlying_scanned = self
819 .underlying
820 .assume_ordering(nondet!(
821 /** we do not rely on the order of keys */
822 ))
823 .scan(
824 q!(|| HashMap::new()),
825 q!(move |acc, (k, v)| {
826 let existing_state = acc.entry(k.clone()).or_insert_with(|| Some(init()));
827 if let Some(existing_state_value) = existing_state {
828 match f(existing_state_value, v) {
829 Generate::Yield(out) => Some(Some((k, out))),
830 Generate::Return(out) => {
831 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
832 Some(Some((k, out)))
833 }
834 Generate::Break => {
835 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
836 Some(None)
837 }
838 Generate::Continue => Some(None),
839 }
840 } else {
841 Some(None)
842 }
843 }),
844 )
845 .flatten_ordered();
846
847 KeyedStream {
848 underlying: underlying_scanned.into(),
849 _phantom_order: Default::default(),
850 }
851 }
852
853 /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
854 /// in-order across the values in each group. But the aggregation function returns a boolean,
855 /// which when true indicates that the aggregated result is complete and can be released to
856 /// downstream computation. Unlike [`Stream::fold_keyed`], this means that even if the input
857 /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
858 /// normal stream elements.
859 ///
860 /// # Example
861 /// ```rust
862 /// # use hydro_lang::prelude::*;
863 /// # use futures::StreamExt;
864 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
865 /// process
866 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
867 /// .into_keyed()
868 /// .fold_early_stop(
869 /// q!(|| 0),
870 /// q!(|acc, x| {
871 /// *acc += x;
872 /// x % 2 == 0
873 /// }),
874 /// )
875 /// # .entries()
876 /// # }, |mut stream| async move {
877 /// // Output: { 0: 2, 1: 9 }
878 /// # for w in vec![(0, 2), (1, 9)] {
879 /// # assert_eq!(stream.next().await.unwrap(), w);
880 /// # }
881 /// # }));
882 /// ```
883 pub fn fold_early_stop<A, I, F>(
884 self,
885 init: impl IntoQuotedMut<'a, I, L> + Copy,
886 f: impl IntoQuotedMut<'a, F, L> + Copy,
887 ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
888 where
889 K: Clone,
890 I: Fn() -> A + 'a,
891 F: Fn(&mut A, V) -> bool + 'a,
892 {
893 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
894 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
895 let out_without_bound_cast = self
896 .generator(
897 q!(move || Some(init())),
898 q!(move |key_state, v| {
899 if let Some(key_state_value) = key_state.as_mut() {
900 if f(key_state_value, v) {
901 Generate::Return(key_state.take().unwrap())
902 } else {
903 Generate::Continue
904 }
905 } else {
906 unreachable!()
907 }
908 }),
909 )
910 .underlying;
911
912 KeyedSingleton {
913 underlying: out_without_bound_cast,
914 }
915 }
916
917 /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
918 /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
919 /// otherwise the first element would be non-deterministic.
920 ///
921 /// # Example
922 /// ```rust
923 /// # use hydro_lang::prelude::*;
924 /// # use futures::StreamExt;
925 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
926 /// process
927 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
928 /// .into_keyed()
929 /// .first()
930 /// # .entries()
931 /// # }, |mut stream| async move {
932 /// // Output: { 0: 2, 1: 3 }
933 /// # for w in vec![(0, 2), (1, 3)] {
934 /// # assert_eq!(stream.next().await.unwrap(), w);
935 /// # }
936 /// # }));
937 /// ```
938 pub fn first(self) -> KeyedSingleton<K, V, L, B::WhenValueBounded>
939 where
940 K: Clone,
941 {
942 self.fold_early_stop(
943 q!(|| None),
944 q!(|acc, v| {
945 *acc = Some(v);
946 true
947 }),
948 )
949 .map(q!(|v| v.unwrap()))
950 }
951
952 /// Like [`Stream::fold`], aggregates the values in each group via the `comb` closure.
953 ///
954 /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
955 /// to depend on the order of elements in the group.
956 ///
957 /// If the input and output value types are the same and do not require initialization then use
958 /// [`KeyedStream::reduce`].
959 ///
960 /// # Example
961 /// ```rust
962 /// # use hydro_lang::prelude::*;
963 /// # use futures::StreamExt;
964 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
965 /// let tick = process.tick();
966 /// let numbers = process
967 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
968 /// .into_keyed();
969 /// let batch = numbers.batch(&tick, nondet!(/** test */));
970 /// batch
971 /// .fold(q!(|| 0), q!(|acc, x| *acc += x))
972 /// .entries()
973 /// .all_ticks()
974 /// # }, |mut stream| async move {
975 /// // (1, 5), (2, 7)
976 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
977 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
978 /// # }));
979 /// ```
980 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
981 self,
982 init: impl IntoQuotedMut<'a, I, L>,
983 comb: impl IntoQuotedMut<'a, F, L>,
984 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
985 let init = init.splice_fn0_ctx(&self.underlying.location).into();
986 let comb = comb
987 .splice_fn2_borrow_mut_ctx(&self.underlying.location)
988 .into();
989
990 let out_ir = HydroNode::FoldKeyed {
991 init,
992 acc: comb,
993 input: Box::new(self.underlying.ir_node.into_inner()),
994 metadata: self.underlying.location.new_node_metadata::<(K, A)>(),
995 };
996
997 KeyedSingleton {
998 underlying: Stream::new(self.underlying.location, out_ir),
999 }
1000 }
1001
1002 /// Like [`Stream::reduce`], aggregates the values in each group via the `comb` closure.
1003 ///
1004 /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1005 /// to depend on the order of elements in the stream.
1006 ///
1007 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1008 ///
1009 /// # Example
1010 /// ```rust
1011 /// # use hydro_lang::prelude::*;
1012 /// # use futures::StreamExt;
1013 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1014 /// let tick = process.tick();
1015 /// let numbers = process
1016 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1017 /// .into_keyed();
1018 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1019 /// batch.reduce(q!(|acc, x| *acc += x)).entries().all_ticks()
1020 /// # }, |mut stream| async move {
1021 /// // (1, 5), (2, 7)
1022 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1023 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1024 /// # }));
1025 /// ```
1026 pub fn reduce<F: Fn(&mut V, V) + 'a>(
1027 self,
1028 comb: impl IntoQuotedMut<'a, F, L>,
1029 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1030 let f = comb
1031 .splice_fn2_borrow_mut_ctx(&self.underlying.location)
1032 .into();
1033
1034 let out_ir = HydroNode::ReduceKeyed {
1035 f,
1036 input: Box::new(self.underlying.ir_node.into_inner()),
1037 metadata: self.underlying.location.new_node_metadata::<(K, V)>(),
1038 };
1039
1040 KeyedSingleton {
1041 underlying: Stream::new(self.underlying.location, out_ir),
1042 }
1043 }
1044
1045 /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark are automatically deleted.
1046 ///
1047 /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1048 /// to depend on the order of elements in the stream.
1049 ///
1050 /// # Example
1051 /// ```rust
1052 /// # use hydro_lang::prelude::*;
1053 /// # use futures::StreamExt;
1054 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1055 /// let tick = process.tick();
1056 /// let watermark = tick.singleton(q!(1));
1057 /// let numbers = process
1058 /// .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1059 /// .into_keyed();
1060 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1061 /// batch
1062 /// .reduce_watermark(watermark, q!(|acc, x| *acc += x))
1063 /// .entries()
1064 /// .all_ticks()
1065 /// # }, |mut stream| async move {
1066 /// // (2, 204)
1067 /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
1068 /// # }));
1069 /// ```
1070 pub fn reduce_watermark<O, F>(
1071 self,
1072 other: impl Into<Optional<O, Tick<L::Root>, Bounded>>,
1073 comb: impl IntoQuotedMut<'a, F, L>,
1074 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1075 where
1076 O: Clone,
1077 F: Fn(&mut V, V) + 'a,
1078 {
1079 let other: Optional<O, Tick<L::Root>, Bounded> = other.into();
1080 check_matching_location(&self.underlying.location.root(), other.location.outer());
1081 let f = comb
1082 .splice_fn2_borrow_mut_ctx(&self.underlying.location)
1083 .into();
1084
1085 let out_ir = Stream::new(
1086 self.underlying.location.clone(),
1087 HydroNode::ReduceKeyedWatermark {
1088 f,
1089 input: Box::new(self.underlying.ir_node.into_inner()),
1090 watermark: Box::new(other.ir_node.into_inner()),
1091 metadata: self.underlying.location.new_node_metadata::<(K, V)>(),
1092 },
1093 );
1094
1095 KeyedSingleton { underlying: out_ir }
1096 }
1097}
1098
1099impl<'a, K, V, L, B: Boundedness, O: Ordering> KeyedStream<K, V, L, B, O, ExactlyOnce>
1100where
1101 K: Eq + Hash,
1102 L: Location<'a>,
1103{
1104 /// Like [`Stream::fold_commutative`], aggregates the values in each group via the `comb` closure.
1105 ///
1106 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1107 ///
1108 /// If the input and output value types are the same and do not require initialization then use
1109 /// [`KeyedStream::reduce_commutative`].
1110 ///
1111 /// # Example
1112 /// ```rust
1113 /// # use hydro_lang::prelude::*;
1114 /// # use futures::StreamExt;
1115 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1116 /// let tick = process.tick();
1117 /// let numbers = process
1118 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1119 /// .into_keyed();
1120 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1121 /// batch
1122 /// .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1123 /// .entries()
1124 /// .all_ticks()
1125 /// # }, |mut stream| async move {
1126 /// // (1, 5), (2, 7)
1127 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1128 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1129 /// # }));
1130 /// ```
1131 pub fn fold_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1132 self,
1133 init: impl IntoQuotedMut<'a, I, L>,
1134 comb: impl IntoQuotedMut<'a, F, L>,
1135 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1136 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1137 .fold(init, comb)
1138 }
1139
1140 /// Like [`Stream::reduce_commutative`], aggregates the values in each group via the `comb` closure.
1141 ///
1142 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1143 ///
1144 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative`].
1145 ///
1146 /// # Example
1147 /// ```rust
1148 /// # use hydro_lang::prelude::*;
1149 /// # use futures::StreamExt;
1150 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1151 /// let tick = process.tick();
1152 /// let numbers = process
1153 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1154 /// .into_keyed();
1155 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1156 /// batch
1157 /// .reduce_commutative(q!(|acc, x| *acc += x))
1158 /// .entries()
1159 /// .all_ticks()
1160 /// # }, |mut stream| async move {
1161 /// // (1, 5), (2, 7)
1162 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1163 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1164 /// # }));
1165 /// ```
1166 pub fn reduce_commutative<F: Fn(&mut V, V) + 'a>(
1167 self,
1168 comb: impl IntoQuotedMut<'a, F, L>,
1169 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1170 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1171 .reduce(comb)
1172 }
1173
1174 /// A special case of [`KeyedStream::reduce_commutative`] where tuples with keys less than the watermark are automatically deleted.
1175 ///
1176 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1177 ///
1178 /// # Example
1179 /// ```rust
1180 /// # use hydro_lang::prelude::*;
1181 /// # use futures::StreamExt;
1182 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1183 /// let tick = process.tick();
1184 /// let watermark = tick.singleton(q!(1));
1185 /// let numbers = process
1186 /// .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1187 /// .into_keyed();
1188 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1189 /// batch
1190 /// .reduce_watermark_commutative(watermark, q!(|acc, x| *acc += x))
1191 /// .entries()
1192 /// .all_ticks()
1193 /// # }, |mut stream| async move {
1194 /// // (2, 204)
1195 /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
1196 /// # }));
1197 /// ```
1198 pub fn reduce_watermark_commutative<O2, F>(
1199 self,
1200 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1201 comb: impl IntoQuotedMut<'a, F, L>,
1202 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1203 where
1204 O2: Clone,
1205 F: Fn(&mut V, V) + 'a,
1206 {
1207 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1208 .reduce_watermark(other, comb)
1209 }
1210}
1211
1212impl<'a, K, V, L, B: Boundedness, R: Retries> KeyedStream<K, V, L, B, TotalOrder, R>
1213where
1214 K: Eq + Hash,
1215 L: Location<'a>,
1216{
1217 /// Like [`Stream::fold_idempotent`], aggregates the values in each group via the `comb` closure.
1218 ///
1219 /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
1220 ///
1221 /// If the input and output value types are the same and do not require initialization then use
1222 /// [`KeyedStream::reduce_idempotent`].
1223 ///
1224 /// # Example
1225 /// ```rust
1226 /// # use hydro_lang::prelude::*;
1227 /// # use futures::StreamExt;
1228 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1229 /// let tick = process.tick();
1230 /// let numbers = process
1231 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1232 /// .into_keyed();
1233 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1234 /// batch
1235 /// .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1236 /// .entries()
1237 /// .all_ticks()
1238 /// # }, |mut stream| async move {
1239 /// // (1, false), (2, true)
1240 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1241 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1242 /// # }));
1243 /// ```
1244 pub fn fold_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1245 self,
1246 init: impl IntoQuotedMut<'a, I, L>,
1247 comb: impl IntoQuotedMut<'a, F, L>,
1248 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1249 self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1250 .fold(init, comb)
1251 }
1252
1253 /// Like [`Stream::reduce_idempotent`], aggregates the values in each group via the `comb` closure.
1254 ///
1255 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1256 ///
1257 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_idempotent`].
1258 ///
1259 /// # Example
1260 /// ```rust
1261 /// # use hydro_lang::prelude::*;
1262 /// # use futures::StreamExt;
1263 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1264 /// let tick = process.tick();
1265 /// let numbers = process
1266 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1267 /// .into_keyed();
1268 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1269 /// batch
1270 /// .reduce_idempotent(q!(|acc, x| *acc |= x))
1271 /// .entries()
1272 /// .all_ticks()
1273 /// # }, |mut stream| async move {
1274 /// // (1, false), (2, true)
1275 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1276 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1277 /// # }));
1278 /// ```
1279 pub fn reduce_idempotent<F: Fn(&mut V, V) + 'a>(
1280 self,
1281 comb: impl IntoQuotedMut<'a, F, L>,
1282 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1283 self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1284 .reduce(comb)
1285 }
1286
1287 /// A special case of [`KeyedStream::reduce_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1288 ///
1289 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1290 ///
1291 /// # Example
1292 /// ```rust
1293 /// # use hydro_lang::prelude::*;
1294 /// # use futures::StreamExt;
1295 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1296 /// let tick = process.tick();
1297 /// let watermark = tick.singleton(q!(1));
1298 /// let numbers = process
1299 /// .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1300 /// .into_keyed();
1301 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1302 /// batch
1303 /// .reduce_watermark_idempotent(watermark, q!(|acc, x| *acc |= x))
1304 /// .entries()
1305 /// .all_ticks()
1306 /// # }, |mut stream| async move {
1307 /// // (2, true)
1308 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1309 /// # }));
1310 /// ```
1311 pub fn reduce_watermark_idempotent<O2, F>(
1312 self,
1313 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1314 comb: impl IntoQuotedMut<'a, F, L>,
1315 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1316 where
1317 O2: Clone,
1318 F: Fn(&mut V, V) + 'a,
1319 {
1320 self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1321 .reduce_watermark(other, comb)
1322 }
1323}
1324
1325impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1326where
1327 K: Eq + Hash,
1328 L: Location<'a>,
1329{
1330 /// Like [`Stream::fold_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1331 ///
1332 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1333 /// as there may be non-deterministic duplicates.
1334 ///
1335 /// If the input and output value types are the same and do not require initialization then use
1336 /// [`KeyedStream::reduce_commutative_idempotent`].
1337 ///
1338 /// # Example
1339 /// ```rust
1340 /// # use hydro_lang::prelude::*;
1341 /// # use futures::StreamExt;
1342 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1343 /// let tick = process.tick();
1344 /// let numbers = process
1345 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1346 /// .into_keyed();
1347 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1348 /// batch
1349 /// .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1350 /// .entries()
1351 /// .all_ticks()
1352 /// # }, |mut stream| async move {
1353 /// // (1, false), (2, true)
1354 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1355 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1356 /// # }));
1357 /// ```
1358 pub fn fold_commutative_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1359 self,
1360 init: impl IntoQuotedMut<'a, I, L>,
1361 comb: impl IntoQuotedMut<'a, F, L>,
1362 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1363 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1364 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1365 .fold(init, comb)
1366 }
1367
1368 /// Like [`Stream::reduce_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1369 ///
1370 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1371 /// as there may be non-deterministic duplicates.
1372 ///
1373 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative_idempotent`].
1374 ///
1375 /// # Example
1376 /// ```rust
1377 /// # use hydro_lang::prelude::*;
1378 /// # use futures::StreamExt;
1379 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1380 /// let tick = process.tick();
1381 /// let numbers = process
1382 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1383 /// .into_keyed();
1384 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1385 /// batch
1386 /// .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1387 /// .entries()
1388 /// .all_ticks()
1389 /// # }, |mut stream| async move {
1390 /// // (1, false), (2, true)
1391 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1392 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1393 /// # }));
1394 /// ```
1395 pub fn reduce_commutative_idempotent<F: Fn(&mut V, V) + 'a>(
1396 self,
1397 comb: impl IntoQuotedMut<'a, F, L>,
1398 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1399 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1400 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1401 .reduce(comb)
1402 }
1403
1404 /// A special case of [`Stream::reduce_keyed_commutative_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1405 ///
1406 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1407 /// as there may be non-deterministic duplicates.
1408 ///
1409 /// # Example
1410 /// ```rust
1411 /// # use hydro_lang::prelude::*;
1412 /// # use futures::StreamExt;
1413 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1414 /// let tick = process.tick();
1415 /// let watermark = tick.singleton(q!(1));
1416 /// let numbers = process
1417 /// .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1418 /// .into_keyed();
1419 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1420 /// batch
1421 /// .reduce_watermark_commutative_idempotent(watermark, q!(|acc, x| *acc |= x))
1422 /// .entries()
1423 /// .all_ticks()
1424 /// # }, |mut stream| async move {
1425 /// // (2, true)
1426 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1427 /// # }));
1428 /// ```
1429 pub fn reduce_watermark_commutative_idempotent<O2, F>(
1430 self,
1431 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1432 comb: impl IntoQuotedMut<'a, F, L>,
1433 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1434 where
1435 O2: Clone,
1436 F: Fn(&mut V, V) + 'a,
1437 {
1438 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1439 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1440 .reduce_watermark(other, comb)
1441 }
1442
1443 /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1444 /// whose keys are not in the bounded stream.
1445 ///
1446 /// # Example
1447 /// ```rust
1448 /// # use hydro_lang::prelude::*;
1449 /// # use futures::StreamExt;
1450 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1451 /// let tick = process.tick();
1452 /// let keyed_stream = process
1453 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1454 /// .batch(&tick, nondet!(/** test */))
1455 /// .into_keyed();
1456 /// let keys_to_remove = process
1457 /// .source_iter(q!(vec![1, 2]))
1458 /// .batch(&tick, nondet!(/** test */));
1459 /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1460 /// # .entries()
1461 /// # }, |mut stream| async move {
1462 /// // { 3: ['c'], 4: ['d'] }
1463 /// # for w in vec![(3, 'c'), (4, 'd')] {
1464 /// # assert_eq!(stream.next().await.unwrap(), w);
1465 /// # }
1466 /// # }));
1467 /// ```
1468 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1469 self,
1470 other: Stream<K, L, Bounded, O2, R2>,
1471 ) -> Self {
1472 KeyedStream {
1473 underlying: self.entries().anti_join(other),
1474 _phantom_order: Default::default(),
1475 }
1476 }
1477}
1478
1479impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1480where
1481 L: Location<'a> + NoTick + NoAtomic,
1482{
1483 /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
1484 /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
1485 ///
1486 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1487 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1488 /// argument that declares where the stream will be atomically processed. Batching a stream into
1489 /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
1490 /// [`Tick`] will introduce asynchrony.
1491 pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
1492 KeyedStream {
1493 underlying: self.underlying.atomic(tick),
1494 _phantom_order: Default::default(),
1495 }
1496 }
1497
1498 /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
1499 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1500 /// the order of the input.
1501 ///
1502 /// # Non-Determinism
1503 /// The batch boundaries are non-deterministic and may change across executions.
1504 pub fn batch(
1505 self,
1506 tick: &Tick<L>,
1507 nondet: NonDet,
1508 ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1509 self.atomic(tick).batch(nondet)
1510 }
1511}
1512
1513impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
1514where
1515 L: Location<'a> + NoTick + NoAtomic,
1516{
1517 /// Returns a keyed stream corresponding to the latest batch of elements being atomically
1518 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
1519 /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
1520 /// used to create the atomic section.
1521 ///
1522 /// # Non-Determinism
1523 /// The batch boundaries are non-deterministic and may change across executions.
1524 pub fn batch(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1525 KeyedStream {
1526 underlying: self.underlying.batch(nondet),
1527 _phantom_order: Default::default(),
1528 }
1529 }
1530
1531 /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
1532 /// See [`KeyedStream::atomic`] for more details.
1533 pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
1534 KeyedStream {
1535 underlying: self.underlying.end_atomic(),
1536 _phantom_order: Default::default(),
1537 }
1538 }
1539}
1540
1541impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, L, Bounded, O, R>
1542where
1543 L: Location<'a>,
1544{
1545 /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
1546 /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
1547 /// is only present in one of the inputs, its values are passed through as-is). The output has
1548 /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
1549 ///
1550 /// Currently, both input streams must be [`Bounded`]. This operator will block
1551 /// on the first stream until all its elements are available. In a future version,
1552 /// we will relax the requirement on the `other` stream.
1553 ///
1554 /// # Example
1555 /// ```rust
1556 /// # use hydro_lang::prelude::*;
1557 /// # use futures::StreamExt;
1558 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1559 /// let tick = process.tick();
1560 /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
1561 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1562 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1563 /// # .entries()
1564 /// # }, |mut stream| async move {
1565 /// // { 0: [2, 1], 1: [4, 3] }
1566 /// # for w in vec![(0, 2), (1, 4), (0, 1), (1, 3)] {
1567 /// # assert_eq!(stream.next().await.unwrap(), w);
1568 /// # }
1569 /// # }));
1570 /// ```
1571 pub fn chain<O2: Ordering>(
1572 self,
1573 other: KeyedStream<K, V, L, Bounded, O2, R>,
1574 ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, R>
1575 where
1576 O: MinOrder<O2>,
1577 {
1578 KeyedStream {
1579 underlying: self.underlying.chain(other.underlying),
1580 _phantom_order: Default::default(),
1581 }
1582 }
1583}
1584
1585impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
1586where
1587 L: Location<'a>,
1588{
1589 /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
1590 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
1591 /// each key.
1592 pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
1593 KeyedStream {
1594 underlying: self.underlying.all_ticks(),
1595 _phantom_order: Default::default(),
1596 }
1597 }
1598
1599 /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
1600 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
1601 /// each key.
1602 ///
1603 /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
1604 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1605 /// stream's [`Tick`] context.
1606 pub fn all_ticks_atomic(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
1607 KeyedStream {
1608 underlying: self.underlying.all_ticks(),
1609 _phantom_order: Default::default(),
1610 }
1611 }
1612
1613 #[expect(missing_docs, reason = "TODO")]
1614 pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1615 KeyedStream {
1616 underlying: self.underlying.defer_tick(),
1617 _phantom_order: Default::default(),
1618 }
1619 }
1620}
1621
1622#[cfg(test)]
1623mod tests {
1624 use futures::{SinkExt, StreamExt};
1625 use hydro_deploy::Deployment;
1626 use stageleft::q;
1627
1628 use crate::compile::builder::FlowBuilder;
1629 use crate::location::Location;
1630 use crate::nondet::nondet;
1631
1632 #[tokio::test]
1633 async fn reduce_watermark_filter() {
1634 let mut deployment = Deployment::new();
1635
1636 let flow = FlowBuilder::new();
1637 let node = flow.process::<()>();
1638 let external = flow.external::<()>();
1639
1640 let node_tick = node.tick();
1641 let watermark = node_tick.singleton(q!(1));
1642
1643 let sum = node
1644 .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1645 .into_keyed()
1646 .reduce_watermark(
1647 watermark,
1648 q!(|acc, v| {
1649 *acc += v;
1650 }),
1651 )
1652 .snapshot(&node_tick, nondet!(/** test */))
1653 .entries()
1654 .all_ticks()
1655 .send_bincode_external(&external);
1656
1657 let nodes = flow
1658 .with_process(&node, deployment.Localhost())
1659 .with_external(&external, deployment.Localhost())
1660 .deploy(&mut deployment);
1661
1662 deployment.deploy().await.unwrap();
1663
1664 let mut out = nodes.connect_source_bincode(sum).await;
1665
1666 deployment.start().await.unwrap();
1667
1668 assert_eq!(out.next().await.unwrap(), (2, 204));
1669 }
1670
1671 #[tokio::test]
1672 async fn reduce_watermark_garbage_collect() {
1673 let mut deployment = Deployment::new();
1674
1675 let flow = FlowBuilder::new();
1676 let node = flow.process::<()>();
1677 let external = flow.external::<()>();
1678 let (tick_send, tick_trigger) = node.source_external_bincode(&external);
1679
1680 let node_tick = node.tick();
1681 let (watermark_complete_cycle, watermark) =
1682 node_tick.cycle_with_initial(node_tick.singleton(q!(1)));
1683 let next_watermark = watermark.clone().map(q!(|v| v + 1));
1684 watermark_complete_cycle.complete_next_tick(next_watermark);
1685
1686 let tick_triggered_input = node
1687 .source_iter(q!([(3, 103)]))
1688 .batch(&node_tick, nondet!(/** test */))
1689 .filter_if_some(
1690 tick_trigger
1691 .clone()
1692 .batch(&node_tick, nondet!(/** test */))
1693 .first(),
1694 )
1695 .all_ticks();
1696
1697 let sum = node
1698 .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1699 .interleave(tick_triggered_input)
1700 .into_keyed()
1701 .reduce_watermark_commutative(
1702 watermark,
1703 q!(|acc, v| {
1704 *acc += v;
1705 }),
1706 )
1707 .snapshot(&node_tick, nondet!(/** test */))
1708 .entries()
1709 .all_ticks()
1710 .send_bincode_external(&external);
1711
1712 let nodes = flow
1713 .with_default_optimize()
1714 .with_process(&node, deployment.Localhost())
1715 .with_external(&external, deployment.Localhost())
1716 .deploy(&mut deployment);
1717
1718 deployment.deploy().await.unwrap();
1719
1720 let mut tick_send = nodes.connect_sink_bincode(tick_send).await;
1721 let mut out_recv = nodes.connect_source_bincode(sum).await;
1722
1723 deployment.start().await.unwrap();
1724
1725 assert_eq!(out_recv.next().await.unwrap(), (2, 204));
1726
1727 tick_send.send(()).await.unwrap();
1728
1729 assert_eq!(out_recv.next().await.unwrap(), (3, 103));
1730 }
1731}