hydro_lang/live_collections/keyed_stream/mod.rs
1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, Unbounded};
13use super::keyed_singleton::KeyedSingleton;
14use super::optional::Optional;
15use super::stream::{ExactlyOnce, MinOrder, MinRetries, NoOrder, Stream, TotalOrder};
16use crate::compile::ir::{
17 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
18};
19#[cfg(stageleft_runtime)]
20use crate::forward_handle::{CycleCollection, ReceiverComplete};
21use crate::forward_handle::{ForwardRef, TickCycle};
22use crate::live_collections::stream::{Ordering, Retries};
23#[cfg(stageleft_runtime)]
24use crate::location::dynamic::{DynLocation, LocationId};
25use crate::location::tick::DeferTick;
26use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
27use crate::manual_expr::ManualExpr;
28use crate::nondet::{NonDet, nondet};
29
30pub mod networking;
31
32/// Streaming elements of type `V` grouped by a key of type `K`.
33///
34/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
35/// order of keys is non-deterministic but the order *within* each group may be deterministic.
36///
37/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
38/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
39/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
40///
41/// Type Parameters:
42/// - `K`: the type of the key for each group
43/// - `V`: the type of the elements inside each group
44/// - `Loc`: the [`Location`] where the keyed stream is materialized
45/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
46/// - `Order`: tracks whether the elements within each group have deterministic order
47/// ([`TotalOrder`]) or not ([`NoOrder`])
48/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
49/// ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
50pub struct KeyedStream<
51 K,
52 V,
53 Loc,
54 Bound: Boundedness,
55 Order: Ordering = TotalOrder,
56 Retry: Retries = ExactlyOnce,
57> {
58 pub(crate) location: Loc,
59 pub(crate) ir_node: RefCell<HydroNode>,
60
61 _phantom: PhantomData<(K, V, Loc, Bound, Order, Retry)>,
62}
63
64impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
65 for KeyedStream<K, V, L, B, NoOrder, R>
66where
67 L: Location<'a>,
68{
69 fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
70 KeyedStream {
71 location: stream.location,
72 ir_node: stream.ir_node,
73 _phantom: PhantomData,
74 }
75 }
76}
77
78impl<'a, K, V, L, O: Ordering, R: Retries> DeferTick for KeyedStream<K, V, Tick<L>, Bounded, O, R>
79where
80 L: Location<'a>,
81{
82 fn defer_tick(self) -> Self {
83 KeyedStream::defer_tick(self)
84 }
85}
86
87impl<'a, K, V, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
88 for KeyedStream<K, V, Tick<L>, Bounded, O, R>
89where
90 L: Location<'a>,
91{
92 type Location = Tick<L>;
93
94 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
95 KeyedStream {
96 location: location.clone(),
97 ir_node: RefCell::new(HydroNode::CycleSource {
98 ident,
99 metadata: location.new_node_metadata(
100 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
101 ),
102 }),
103 _phantom: PhantomData,
104 }
105 }
106}
107
108impl<'a, K, V, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
109 for KeyedStream<K, V, Tick<L>, Bounded, O, R>
110where
111 L: Location<'a>,
112{
113 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
114 assert_eq!(
115 Location::id(&self.location),
116 expected_location,
117 "locations do not match"
118 );
119
120 self.location
121 .flow_state()
122 .borrow_mut()
123 .push_root(HydroRoot::CycleSink {
124 ident,
125 input: Box::new(self.ir_node.into_inner()),
126 op_metadata: HydroIrOpMetadata::new(),
127 });
128 }
129}
130
131impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
132 for KeyedStream<K, V, L, B, O, R>
133where
134 L: Location<'a> + NoTick,
135{
136 type Location = L;
137
138 fn create_source(ident: syn::Ident, location: L) -> Self {
139 KeyedStream {
140 location: location.clone(),
141 ir_node: RefCell::new(HydroNode::CycleSource {
142 ident,
143 metadata: location
144 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
145 }),
146 _phantom: PhantomData,
147 }
148 }
149}
150
151impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
152 for KeyedStream<K, V, L, B, O, R>
153where
154 L: Location<'a> + NoTick,
155{
156 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
157 assert_eq!(
158 Location::id(&self.location),
159 expected_location,
160 "locations do not match"
161 );
162 self.location
163 .flow_state()
164 .borrow_mut()
165 .push_root(HydroRoot::CycleSink {
166 ident,
167 input: Box::new(self.ir_node.into_inner()),
168 op_metadata: HydroIrOpMetadata::new(),
169 });
170 }
171}
172
173impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
174 Clone for KeyedStream<K, V, Loc, Bound, Order, R>
175{
176 fn clone(&self) -> Self {
177 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
178 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
179 *self.ir_node.borrow_mut() = HydroNode::Tee {
180 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
181 metadata: self.location.new_node_metadata(Self::collection_kind()),
182 };
183 }
184
185 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
186 KeyedStream {
187 location: self.location.clone(),
188 ir_node: HydroNode::Tee {
189 inner: TeeNode(inner.0.clone()),
190 metadata: metadata.clone(),
191 }
192 .into(),
193 _phantom: PhantomData,
194 }
195 } else {
196 unreachable!()
197 }
198 }
199}
200
201impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
202 KeyedStream<K, V, L, B, O, R>
203{
204 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
205 debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
206 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
207
208 KeyedStream {
209 location,
210 ir_node: RefCell::new(ir_node),
211 _phantom: PhantomData,
212 }
213 }
214
215 /// Returns the [`CollectionKind`] corresponding to this type.
216 pub fn collection_kind() -> CollectionKind {
217 CollectionKind::KeyedStream {
218 bound: B::BOUND_KIND,
219 value_order: O::ORDERING_KIND,
220 value_retry: R::RETRIES_KIND,
221 key_type: stageleft::quote_type::<K>().into(),
222 value_type: stageleft::quote_type::<V>().into(),
223 }
224 }
225
226 /// Returns the [`Location`] where this keyed stream is being materialized.
227 pub fn location(&self) -> &L {
228 &self.location
229 }
230
231 /// Explicitly "casts" the keyed stream to a type with a different ordering
232 /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
233 /// by the type-system.
234 ///
235 /// # Non-Determinism
236 /// This function is used as an escape hatch, and any mistakes in the
237 /// provided ordering guarantee will propagate into the guarantees
238 /// for the rest of the program.
239 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
240 if O::ORDERING_KIND == O2::ORDERING_KIND {
241 KeyedStream::new(self.location, self.ir_node.into_inner())
242 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
243 // We can always weaken the ordering guarantee
244 KeyedStream::new(
245 self.location.clone(),
246 HydroNode::Cast {
247 inner: Box::new(self.ir_node.into_inner()),
248 metadata: self
249 .location
250 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
251 },
252 )
253 } else {
254 KeyedStream::new(
255 self.location.clone(),
256 HydroNode::ObserveNonDet {
257 inner: Box::new(self.ir_node.into_inner()),
258 metadata: self
259 .location
260 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
261 },
262 )
263 }
264 }
265
266 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
267 /// which is always safe because that is the weakest possible guarantee.
268 pub fn weakest_ordering(self) -> KeyedStream<K, V, L, B, NoOrder, R> {
269 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
270 self.assume_ordering::<NoOrder>(nondet)
271 }
272
273 /// Explicitly "casts" the keyed stream to a type with a different retries
274 /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
275 /// be proven by the type-system.
276 ///
277 /// # Non-Determinism
278 /// This function is used as an escape hatch, and any mistakes in the
279 /// provided retries guarantee will propagate into the guarantees
280 /// for the rest of the program.
281 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
282 if R::RETRIES_KIND == R2::RETRIES_KIND {
283 KeyedStream::new(self.location, self.ir_node.into_inner())
284 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
285 // We can always weaken the retries guarantee
286 KeyedStream::new(
287 self.location.clone(),
288 HydroNode::Cast {
289 inner: Box::new(self.ir_node.into_inner()),
290 metadata: self
291 .location
292 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
293 },
294 )
295 } else {
296 KeyedStream::new(
297 self.location.clone(),
298 HydroNode::ObserveNonDet {
299 inner: Box::new(self.ir_node.into_inner()),
300 metadata: self
301 .location
302 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
303 },
304 )
305 }
306 }
307
308 /// Flattens the keyed stream into an unordered stream of key-value pairs.
309 ///
310 /// # Example
311 /// ```rust
312 /// # use hydro_lang::prelude::*;
313 /// # use futures::StreamExt;
314 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
315 /// process
316 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
317 /// .into_keyed()
318 /// .entries()
319 /// # }, |mut stream| async move {
320 /// // (1, 2), (1, 3), (2, 4) in any order
321 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
322 /// # assert_eq!(stream.next().await.unwrap(), w);
323 /// # }
324 /// # }));
325 /// ```
326 pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
327 Stream::new(
328 self.location.clone(),
329 HydroNode::Cast {
330 inner: Box::new(self.ir_node.into_inner()),
331 metadata: self
332 .location
333 .new_node_metadata(Stream::<(K, V), L, B, NoOrder, R>::collection_kind()),
334 },
335 )
336 }
337
338 /// Flattens the keyed stream into an unordered stream of only the values.
339 ///
340 /// # Example
341 /// ```rust
342 /// # use hydro_lang::prelude::*;
343 /// # use futures::StreamExt;
344 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
345 /// process
346 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
347 /// .into_keyed()
348 /// .values()
349 /// # }, |mut stream| async move {
350 /// // 2, 3, 4 in any order
351 /// # for w in vec![2, 3, 4] {
352 /// # assert_eq!(stream.next().await.unwrap(), w);
353 /// # }
354 /// # }));
355 /// ```
356 pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
357 self.entries().map(q!(|(_, v)| v))
358 }
359
360 /// Transforms each value by invoking `f` on each element, with keys staying the same
361 /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
362 ///
363 /// If you do not want to modify the stream and instead only want to view
364 /// each item use [`KeyedStream::inspect`] instead.
365 ///
366 /// # Example
367 /// ```rust
368 /// # use hydro_lang::prelude::*;
369 /// # use futures::StreamExt;
370 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
371 /// process
372 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
373 /// .into_keyed()
374 /// .map(q!(|v| v + 1))
375 /// # .entries()
376 /// # }, |mut stream| async move {
377 /// // { 1: [3, 4], 2: [5] }
378 /// # for w in vec![(1, 3), (1, 4), (2, 5)] {
379 /// # assert_eq!(stream.next().await.unwrap(), w);
380 /// # }
381 /// # }));
382 /// ```
383 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
384 where
385 F: Fn(V) -> U + 'a,
386 {
387 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
388 let map_f = q!({
389 let orig = f;
390 move |(k, v)| (k, orig(v))
391 })
392 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
393 .into();
394
395 KeyedStream::new(
396 self.location.clone(),
397 HydroNode::Map {
398 f: map_f,
399 input: Box::new(self.ir_node.into_inner()),
400 metadata: self
401 .location
402 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
403 },
404 )
405 }
406
407 /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
408 /// re-grouped even they are tuples; instead they will be grouped under the original key.
409 ///
410 /// If you do not want to modify the stream and instead only want to view
411 /// each item use [`KeyedStream::inspect_with_key`] instead.
412 ///
413 /// # Example
414 /// ```rust
415 /// # use hydro_lang::prelude::*;
416 /// # use futures::StreamExt;
417 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
418 /// process
419 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
420 /// .into_keyed()
421 /// .map_with_key(q!(|(k, v)| k + v))
422 /// # .entries()
423 /// # }, |mut stream| async move {
424 /// // { 1: [3, 4], 2: [6] }
425 /// # for w in vec![(1, 3), (1, 4), (2, 6)] {
426 /// # assert_eq!(stream.next().await.unwrap(), w);
427 /// # }
428 /// # }));
429 /// ```
430 pub fn map_with_key<U, F>(
431 self,
432 f: impl IntoQuotedMut<'a, F, L> + Copy,
433 ) -> KeyedStream<K, U, L, B, O, R>
434 where
435 F: Fn((K, V)) -> U + 'a,
436 K: Clone,
437 {
438 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
439 let map_f = q!({
440 let orig = f;
441 move |(k, v)| {
442 let out = orig((Clone::clone(&k), v));
443 (k, out)
444 }
445 })
446 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
447 .into();
448
449 KeyedStream::new(
450 self.location.clone(),
451 HydroNode::Map {
452 f: map_f,
453 input: Box::new(self.ir_node.into_inner()),
454 metadata: self
455 .location
456 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
457 },
458 )
459 }
460
461 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
462 /// `f`, preserving the order of the elements within the group.
463 ///
464 /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
465 /// not modify or take ownership of the values. If you need to modify the values while filtering
466 /// use [`KeyedStream::filter_map`] instead.
467 ///
468 /// # Example
469 /// ```rust
470 /// # use hydro_lang::prelude::*;
471 /// # use futures::StreamExt;
472 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
473 /// process
474 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
475 /// .into_keyed()
476 /// .filter(q!(|&x| x > 2))
477 /// # .entries()
478 /// # }, |mut stream| async move {
479 /// // { 1: [3], 2: [4] }
480 /// # for w in vec![(1, 3), (2, 4)] {
481 /// # assert_eq!(stream.next().await.unwrap(), w);
482 /// # }
483 /// # }));
484 /// ```
485 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
486 where
487 F: Fn(&V) -> bool + 'a,
488 {
489 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
490 let filter_f = q!({
491 let orig = f;
492 move |t: &(_, _)| orig(&t.1)
493 })
494 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
495 .into();
496
497 KeyedStream::new(
498 self.location.clone(),
499 HydroNode::Filter {
500 f: filter_f,
501 input: Box::new(self.ir_node.into_inner()),
502 metadata: self.location.new_node_metadata(Self::collection_kind()),
503 },
504 )
505 }
506
507 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
508 /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
509 ///
510 /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
511 /// not modify or take ownership of the values. If you need to modify the values while filtering
512 /// use [`KeyedStream::filter_map_with_key`] instead.
513 ///
514 /// # Example
515 /// ```rust
516 /// # use hydro_lang::prelude::*;
517 /// # use futures::StreamExt;
518 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
519 /// process
520 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
521 /// .into_keyed()
522 /// .filter_with_key(q!(|&(k, v)| v - k == 2))
523 /// # .entries()
524 /// # }, |mut stream| async move {
525 /// // { 1: [3], 2: [4] }
526 /// # for w in vec![(1, 3), (2, 4)] {
527 /// # assert_eq!(stream.next().await.unwrap(), w);
528 /// # }
529 /// # }));
530 /// ```
531 pub fn filter_with_key<F>(
532 self,
533 f: impl IntoQuotedMut<'a, F, L> + Copy,
534 ) -> KeyedStream<K, V, L, B, O, R>
535 where
536 F: Fn(&(K, V)) -> bool + 'a,
537 {
538 let filter_f = f
539 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
540 .into();
541
542 KeyedStream::new(
543 self.location.clone(),
544 HydroNode::Filter {
545 f: filter_f,
546 input: Box::new(self.ir_node.into_inner()),
547 metadata: self.location.new_node_metadata(Self::collection_kind()),
548 },
549 )
550 }
551
552 /// An operator that both filters and maps each value, with keys staying the same.
553 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
554 /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
555 ///
556 /// # Example
557 /// ```rust
558 /// # use hydro_lang::prelude::*;
559 /// # use futures::StreamExt;
560 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
561 /// process
562 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
563 /// .into_keyed()
564 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
565 /// # .entries()
566 /// # }, |mut stream| async move {
567 /// // { 1: [2], 2: [4] }
568 /// # for w in vec![(1, 2), (2, 4)] {
569 /// # assert_eq!(stream.next().await.unwrap(), w);
570 /// # }
571 /// # }));
572 /// ```
573 pub fn filter_map<U, F>(
574 self,
575 f: impl IntoQuotedMut<'a, F, L> + Copy,
576 ) -> KeyedStream<K, U, L, B, O, R>
577 where
578 F: Fn(V) -> Option<U> + 'a,
579 {
580 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
581 let filter_map_f = q!({
582 let orig = f;
583 move |(k, v)| orig(v).map(|o| (k, o))
584 })
585 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
586 .into();
587
588 KeyedStream::new(
589 self.location.clone(),
590 HydroNode::FilterMap {
591 f: filter_map_f,
592 input: Box::new(self.ir_node.into_inner()),
593 metadata: self
594 .location
595 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
596 },
597 )
598 }
599
600 /// An operator that both filters and maps each key-value pair. The resulting values are **not**
601 /// re-grouped even they are tuples; instead they will be grouped under the original key.
602 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
603 ///
604 /// # Example
605 /// ```rust
606 /// # use hydro_lang::prelude::*;
607 /// # use futures::StreamExt;
608 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
609 /// process
610 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
611 /// .into_keyed()
612 /// .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
613 /// # .entries()
614 /// # }, |mut stream| async move {
615 /// // { 2: [2] }
616 /// # for w in vec![(2, 2)] {
617 /// # assert_eq!(stream.next().await.unwrap(), w);
618 /// # }
619 /// # }));
620 /// ```
621 pub fn filter_map_with_key<U, F>(
622 self,
623 f: impl IntoQuotedMut<'a, F, L> + Copy,
624 ) -> KeyedStream<K, U, L, B, O, R>
625 where
626 F: Fn((K, V)) -> Option<U> + 'a,
627 K: Clone,
628 {
629 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
630 let filter_map_f = q!({
631 let orig = f;
632 move |(k, v)| {
633 let out = orig((Clone::clone(&k), v));
634 out.map(|o| (k, o))
635 }
636 })
637 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
638 .into();
639
640 KeyedStream::new(
641 self.location.clone(),
642 HydroNode::FilterMap {
643 f: filter_map_f,
644 input: Box::new(self.ir_node.into_inner()),
645 metadata: self
646 .location
647 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
648 },
649 )
650 }
651
652 /// For each value `v` in each group, transform `v` using `f` and then treat the
653 /// result as an [`Iterator`] to produce values one by one within the same group.
654 /// The implementation for [`Iterator`] for the output type `I` must produce items
655 /// in a **deterministic** order.
656 ///
657 /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
658 /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
659 ///
660 /// # Example
661 /// ```rust
662 /// # use hydro_lang::prelude::*;
663 /// # use futures::StreamExt;
664 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
665 /// process
666 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
667 /// .into_keyed()
668 /// .flat_map_ordered(q!(|x| x))
669 /// # .entries()
670 /// # }, |mut stream| async move {
671 /// // { 1: [2, 3, 4], 2: [5, 6] }
672 /// # for w in vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)] {
673 /// # assert_eq!(stream.next().await.unwrap(), w);
674 /// # }
675 /// # }));
676 /// ```
677 pub fn flat_map_ordered<U, I, F>(
678 self,
679 f: impl IntoQuotedMut<'a, F, L> + Copy,
680 ) -> KeyedStream<K, U, L, B, O, R>
681 where
682 I: IntoIterator<Item = U>,
683 F: Fn(V) -> I + 'a,
684 K: Clone,
685 {
686 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
687 let flat_map_f = q!({
688 let orig = f;
689 move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
690 })
691 .splice_fn1_ctx::<(K, V), _>(&self.location)
692 .into();
693
694 KeyedStream::new(
695 self.location.clone(),
696 HydroNode::FlatMap {
697 f: flat_map_f,
698 input: Box::new(self.ir_node.into_inner()),
699 metadata: self
700 .location
701 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
702 },
703 )
704 }
705
706 /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
707 /// for the output type `I` to produce items in any order.
708 ///
709 /// # Example
710 /// ```rust
711 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
712 /// # use futures::StreamExt;
713 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
714 /// process
715 /// .source_iter(q!(vec![
716 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
717 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
718 /// ]))
719 /// .into_keyed()
720 /// .flat_map_unordered(q!(|x| x))
721 /// # .entries()
722 /// # }, |mut stream| async move {
723 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
724 /// # let mut results = Vec::new();
725 /// # for _ in 0..4 {
726 /// # results.push(stream.next().await.unwrap());
727 /// # }
728 /// # results.sort();
729 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
730 /// # }));
731 /// ```
732 pub fn flat_map_unordered<U, I, F>(
733 self,
734 f: impl IntoQuotedMut<'a, F, L> + Copy,
735 ) -> KeyedStream<K, U, L, B, NoOrder, R>
736 where
737 I: IntoIterator<Item = U>,
738 F: Fn(V) -> I + 'a,
739 K: Clone,
740 {
741 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
742 let flat_map_f = q!({
743 let orig = f;
744 move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
745 })
746 .splice_fn1_ctx::<(K, V), _>(&self.location)
747 .into();
748
749 KeyedStream::new(
750 self.location.clone(),
751 HydroNode::FlatMap {
752 f: flat_map_f,
753 input: Box::new(self.ir_node.into_inner()),
754 metadata: self
755 .location
756 .new_node_metadata(KeyedStream::<K, U, L, B, NoOrder, R>::collection_kind()),
757 },
758 )
759 }
760
761 /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
762 /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
763 /// items in a **deterministic** order.
764 ///
765 /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
766 /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
767 ///
768 /// # Example
769 /// ```rust
770 /// # use hydro_lang::prelude::*;
771 /// # use futures::StreamExt;
772 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
773 /// process
774 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
775 /// .into_keyed()
776 /// .flatten_ordered()
777 /// # .entries()
778 /// # }, |mut stream| async move {
779 /// // { 1: [2, 3, 4], 2: [5, 6] }
780 /// # for w in vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)] {
781 /// # assert_eq!(stream.next().await.unwrap(), w);
782 /// # }
783 /// # }));
784 /// ```
785 pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
786 where
787 V: IntoIterator<Item = U>,
788 K: Clone,
789 {
790 self.flat_map_ordered(q!(|d| d))
791 }
792
793 /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
794 /// for the value type `V` to produce items in any order.
795 ///
796 /// # Example
797 /// ```rust
798 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
799 /// # use futures::StreamExt;
800 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
801 /// process
802 /// .source_iter(q!(vec![
803 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
804 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
805 /// ]))
806 /// .into_keyed()
807 /// .flatten_unordered()
808 /// # .entries()
809 /// # }, |mut stream| async move {
810 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
811 /// # let mut results = Vec::new();
812 /// # for _ in 0..4 {
813 /// # results.push(stream.next().await.unwrap());
814 /// # }
815 /// # results.sort();
816 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
817 /// # }));
818 /// ```
819 pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
820 where
821 V: IntoIterator<Item = U>,
822 K: Clone,
823 {
824 self.flat_map_unordered(q!(|d| d))
825 }
826
827 /// An operator which allows you to "inspect" each element of a stream without
828 /// modifying it. The closure `f` is called on a reference to each value. This is
829 /// mainly useful for debugging, and should not be used to generate side-effects.
830 ///
831 /// # Example
832 /// ```rust
833 /// # use hydro_lang::prelude::*;
834 /// # use futures::StreamExt;
835 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
836 /// process
837 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
838 /// .into_keyed()
839 /// .inspect(q!(|v| println!("{}", v)))
840 /// # .entries()
841 /// # }, |mut stream| async move {
842 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
843 /// # assert_eq!(stream.next().await.unwrap(), w);
844 /// # }
845 /// # }));
846 /// ```
847 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
848 where
849 F: Fn(&V) + 'a,
850 {
851 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
852 let inspect_f = q!({
853 let orig = f;
854 move |t: &(_, _)| orig(&t.1)
855 })
856 .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
857 .into();
858
859 KeyedStream::new(
860 self.location.clone(),
861 HydroNode::Inspect {
862 f: inspect_f,
863 input: Box::new(self.ir_node.into_inner()),
864 metadata: self.location.new_node_metadata(Self::collection_kind()),
865 },
866 )
867 }
868
869 /// An operator which allows you to "inspect" each element of a stream without
870 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
871 /// mainly useful for debugging, and should not be used to generate side-effects.
872 ///
873 /// # Example
874 /// ```rust
875 /// # use hydro_lang::prelude::*;
876 /// # use futures::StreamExt;
877 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
878 /// process
879 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
880 /// .into_keyed()
881 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
882 /// # .entries()
883 /// # }, |mut stream| async move {
884 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
885 /// # assert_eq!(stream.next().await.unwrap(), w);
886 /// # }
887 /// # }));
888 /// ```
889 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
890 where
891 F: Fn(&(K, V)) + 'a,
892 {
893 let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
894
895 KeyedStream::new(
896 self.location.clone(),
897 HydroNode::Inspect {
898 f: inspect_f,
899 input: Box::new(self.ir_node.into_inner()),
900 metadata: self.location.new_node_metadata(Self::collection_kind()),
901 },
902 )
903 }
904
905 /// An operator which allows you to "name" a `HydroNode`.
906 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
907 pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
908 {
909 let mut node = self.ir_node.borrow_mut();
910 let metadata = node.metadata_mut();
911 metadata.tag = Some(name.to_string());
912 }
913 self
914 }
915}
916
917impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries>
918 KeyedStream<K, V, L, Unbounded, O, R>
919{
920 /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
921 /// of any overlapping groups. The result has [`NoOrder`] on each group because the
922 /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
923 /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
924 ///
925 /// Currently, both input streams must be [`Unbounded`].
926 ///
927 /// # Example
928 /// ```rust
929 /// # use hydro_lang::prelude::*;
930 /// # use futures::StreamExt;
931 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
932 /// let numbers1 = process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed();
933 /// let numbers2 = process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed();
934 /// numbers1.interleave(numbers2)
935 /// # .entries()
936 /// # }, |mut stream| async move {
937 /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
938 /// # for w in vec![(1, 2), (3, 4), (1, 3), (3, 5)] {
939 /// # assert_eq!(stream.next().await.unwrap(), w);
940 /// # }
941 /// # }));
942 /// ```
943 pub fn interleave<O2: Ordering, R2: Retries>(
944 self,
945 other: KeyedStream<K, V, L, Unbounded, O2, R2>,
946 ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
947 where
948 R: MinRetries<R2>,
949 {
950 let tick = self.location.tick();
951 // Because the outputs are unordered, we can interleave batches from both streams.
952 let nondet_batch_interleaving = nondet!(/** output stream is NoOrder, can interleave */);
953 self.batch(&tick, nondet_batch_interleaving)
954 .weakest_ordering()
955 .chain(
956 other
957 .batch(&tick, nondet_batch_interleaving)
958 .weakest_ordering(),
959 )
960 .all_ticks()
961 }
962}
963
964/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
965/// control the processing of future elements.
966pub enum Generate<T> {
967 /// Emit the provided element, and keep processing future inputs.
968 Yield(T),
969 /// Emit the provided element as the _final_ element, do not process future inputs.
970 Return(T),
971 /// Do not emit anything, but continue processing future inputs.
972 Continue,
973 /// Do not emit anything, and do not process further inputs.
974 Break,
975}
976
977impl<'a, K, V, L, B: Boundedness> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
978where
979 K: Eq + Hash,
980 L: Location<'a>,
981{
982 /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
983 ///
984 /// Unlike [`Stream::fold_keyed`] which only returns the final accumulated value, `scan` produces a new stream
985 /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
986 /// early by returning `None`.
987 ///
988 /// The function takes a mutable reference to the accumulator and the current element, and returns
989 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
990 /// If the function returns `None`, the stream is terminated and no more elements are processed.
991 ///
992 /// # Example
993 /// ```rust
994 /// # use hydro_lang::prelude::*;
995 /// # use futures::StreamExt;
996 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
997 /// process
998 /// .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
999 /// .into_keyed()
1000 /// .scan(
1001 /// q!(|| 0),
1002 /// q!(|acc, x| {
1003 /// *acc += x;
1004 /// if *acc % 2 == 0 { None } else { Some(*acc) }
1005 /// }),
1006 /// )
1007 /// # .entries()
1008 /// # }, |mut stream| async move {
1009 /// // Output: { 0: [1], 1: [3, 7] }
1010 /// # for w in vec![(0, 1), (1, 3), (1, 7)] {
1011 /// # assert_eq!(stream.next().await.unwrap(), w);
1012 /// # }
1013 /// # }));
1014 /// ```
1015 pub fn scan<A, U, I, F>(
1016 self,
1017 init: impl IntoQuotedMut<'a, I, L> + Copy,
1018 f: impl IntoQuotedMut<'a, F, L> + Copy,
1019 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1020 where
1021 K: Clone,
1022 I: Fn() -> A + 'a,
1023 F: Fn(&mut A, V) -> Option<U> + 'a,
1024 {
1025 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1026 self.generator(
1027 init,
1028 q!({
1029 let orig = f;
1030 move |state, v| {
1031 if let Some(out) = orig(state, v) {
1032 Generate::Yield(out)
1033 } else {
1034 Generate::Break
1035 }
1036 }
1037 }),
1038 )
1039 }
1040
1041 /// Iteratively processes the elements in each group using a state machine that can yield
1042 /// elements as it processes its inputs. This is designed to mirror the unstable generator
1043 /// syntax in Rust, without requiring special syntax.
1044 ///
1045 /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
1046 /// state for each group. The second argument defines the processing logic, taking in a
1047 /// mutable reference to the group's state and the value to be processed. It emits a
1048 /// [`Generate`] value, whose variants define what is emitted and whether further inputs
1049 /// should be processed.
1050 ///
1051 /// # Example
1052 /// ```rust
1053 /// # use hydro_lang::prelude::*;
1054 /// # use futures::StreamExt;
1055 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1056 /// process
1057 /// .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
1058 /// .into_keyed()
1059 /// .generator(
1060 /// q!(|| 0),
1061 /// q!(|acc, x| {
1062 /// *acc += x;
1063 /// if *acc > 100 {
1064 /// hydro_lang::live_collections::keyed_stream::Generate::Return(
1065 /// "done!".to_string()
1066 /// )
1067 /// } else if *acc % 2 == 0 {
1068 /// hydro_lang::live_collections::keyed_stream::Generate::Yield(
1069 /// "even".to_string()
1070 /// )
1071 /// } else {
1072 /// hydro_lang::live_collections::keyed_stream::Generate::Continue
1073 /// }
1074 /// }),
1075 /// )
1076 /// # .entries()
1077 /// # }, |mut stream| async move {
1078 /// // Output: { 0: ["even", "done!"], 1: ["even"] }
1079 /// # for w in vec![(0, "even".to_string()), (0, "done!".to_string()), (1, "even".to_string())] {
1080 /// # assert_eq!(stream.next().await.unwrap(), w);
1081 /// # }
1082 /// # }));
1083 /// ```
1084 pub fn generator<A, U, I, F>(
1085 self,
1086 init: impl IntoQuotedMut<'a, I, L> + Copy,
1087 f: impl IntoQuotedMut<'a, F, L> + Copy,
1088 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1089 where
1090 K: Clone,
1091 I: Fn() -> A + 'a,
1092 F: Fn(&mut A, V) -> Generate<U> + 'a,
1093 {
1094 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1095 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1096
1097 let scan_init = q!(|| HashMap::new())
1098 .splice_fn0_ctx::<HashMap<K, Option<A>>>(&self.location)
1099 .into();
1100 let scan_f = q!(move |acc: &mut HashMap<_, _>, (k, v)| {
1101 let existing_state = acc.entry(Clone::clone(&k)).or_insert_with(|| Some(init()));
1102 if let Some(existing_state_value) = existing_state {
1103 match f(existing_state_value, v) {
1104 Generate::Yield(out) => Some(Some((k, out))),
1105 Generate::Return(out) => {
1106 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1107 Some(Some((k, out)))
1108 }
1109 Generate::Break => {
1110 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1111 Some(None)
1112 }
1113 Generate::Continue => Some(None),
1114 }
1115 } else {
1116 Some(None)
1117 }
1118 })
1119 .splice_fn2_borrow_mut_ctx::<HashMap<K, Option<A>>, (K, V), _>(&self.location)
1120 .into();
1121
1122 let scan_node = HydroNode::Scan {
1123 init: scan_init,
1124 acc: scan_f,
1125 input: Box::new(self.ir_node.into_inner()),
1126 metadata: self.location.new_node_metadata(Stream::<
1127 Option<(K, U)>,
1128 L,
1129 B,
1130 TotalOrder,
1131 ExactlyOnce,
1132 >::collection_kind()),
1133 };
1134
1135 let flatten_f = q!(|d| d)
1136 .splice_fn1_ctx::<Option<(K, U)>, _>(&self.location)
1137 .into();
1138 let flatten_node = HydroNode::FlatMap {
1139 f: flatten_f,
1140 input: Box::new(scan_node),
1141 metadata: self.location.new_node_metadata(KeyedStream::<
1142 K,
1143 U,
1144 L,
1145 B,
1146 TotalOrder,
1147 ExactlyOnce,
1148 >::collection_kind()),
1149 };
1150
1151 KeyedStream::new(self.location.clone(), flatten_node)
1152 }
1153
1154 /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
1155 /// in-order across the values in each group. But the aggregation function returns a boolean,
1156 /// which when true indicates that the aggregated result is complete and can be released to
1157 /// downstream computation. Unlike [`Stream::fold_keyed`], this means that even if the input
1158 /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
1159 /// normal stream elements.
1160 ///
1161 /// # Example
1162 /// ```rust
1163 /// # use hydro_lang::prelude::*;
1164 /// # use futures::StreamExt;
1165 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1166 /// process
1167 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1168 /// .into_keyed()
1169 /// .fold_early_stop(
1170 /// q!(|| 0),
1171 /// q!(|acc, x| {
1172 /// *acc += x;
1173 /// x % 2 == 0
1174 /// }),
1175 /// )
1176 /// # .entries()
1177 /// # }, |mut stream| async move {
1178 /// // Output: { 0: 2, 1: 9 }
1179 /// # for w in vec![(0, 2), (1, 9)] {
1180 /// # assert_eq!(stream.next().await.unwrap(), w);
1181 /// # }
1182 /// # }));
1183 /// ```
1184 pub fn fold_early_stop<A, I, F>(
1185 self,
1186 init: impl IntoQuotedMut<'a, I, L> + Copy,
1187 f: impl IntoQuotedMut<'a, F, L> + Copy,
1188 ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
1189 where
1190 K: Clone,
1191 I: Fn() -> A + 'a,
1192 F: Fn(&mut A, V) -> bool + 'a,
1193 {
1194 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1195 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1196 let out_without_bound_cast = self.generator(
1197 q!(move || Some(init())),
1198 q!(move |key_state, v| {
1199 if let Some(key_state_value) = key_state.as_mut() {
1200 if f(key_state_value, v) {
1201 Generate::Return(key_state.take().unwrap())
1202 } else {
1203 Generate::Continue
1204 }
1205 } else {
1206 unreachable!()
1207 }
1208 }),
1209 );
1210
1211 KeyedSingleton::new(
1212 out_without_bound_cast.location.clone(),
1213 HydroNode::Cast {
1214 inner: Box::new(out_without_bound_cast.ir_node.into_inner()),
1215 metadata: out_without_bound_cast
1216 .location
1217 .new_node_metadata(
1218 KeyedSingleton::<K, A, L, B::WhenValueBounded>::collection_kind(),
1219 ),
1220 },
1221 )
1222 }
1223
1224 /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
1225 /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
1226 /// otherwise the first element would be non-deterministic.
1227 ///
1228 /// # Example
1229 /// ```rust
1230 /// # use hydro_lang::prelude::*;
1231 /// # use futures::StreamExt;
1232 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1233 /// process
1234 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1235 /// .into_keyed()
1236 /// .first()
1237 /// # .entries()
1238 /// # }, |mut stream| async move {
1239 /// // Output: { 0: 2, 1: 3 }
1240 /// # for w in vec![(0, 2), (1, 3)] {
1241 /// # assert_eq!(stream.next().await.unwrap(), w);
1242 /// # }
1243 /// # }));
1244 /// ```
1245 pub fn first(self) -> KeyedSingleton<K, V, L, B::WhenValueBounded>
1246 where
1247 K: Clone,
1248 {
1249 self.fold_early_stop(
1250 q!(|| None),
1251 q!(|acc, v| {
1252 *acc = Some(v);
1253 true
1254 }),
1255 )
1256 .map(q!(|v| v.unwrap()))
1257 }
1258
1259 /// Like [`Stream::fold`], aggregates the values in each group via the `comb` closure.
1260 ///
1261 /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1262 /// to depend on the order of elements in the group.
1263 ///
1264 /// If the input and output value types are the same and do not require initialization then use
1265 /// [`KeyedStream::reduce`].
1266 ///
1267 /// # Example
1268 /// ```rust
1269 /// # use hydro_lang::prelude::*;
1270 /// # use futures::StreamExt;
1271 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1272 /// let tick = process.tick();
1273 /// let numbers = process
1274 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1275 /// .into_keyed();
1276 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1277 /// batch
1278 /// .fold(q!(|| 0), q!(|acc, x| *acc += x))
1279 /// .entries()
1280 /// .all_ticks()
1281 /// # }, |mut stream| async move {
1282 /// // (1, 5), (2, 7)
1283 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1284 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1285 /// # }));
1286 /// ```
1287 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1288 self,
1289 init: impl IntoQuotedMut<'a, I, L>,
1290 comb: impl IntoQuotedMut<'a, F, L>,
1291 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1292 let init = init.splice_fn0_ctx(&self.location).into();
1293 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1294
1295 KeyedSingleton::new(
1296 self.location.clone(),
1297 HydroNode::FoldKeyed {
1298 init,
1299 acc: comb,
1300 input: Box::new(self.ir_node.into_inner()),
1301 metadata: self.location.new_node_metadata(KeyedSingleton::<
1302 K,
1303 A,
1304 L,
1305 B::WhenValueUnbounded,
1306 >::collection_kind()),
1307 },
1308 )
1309 }
1310
1311 /// Like [`Stream::reduce`], aggregates the values in each group via the `comb` closure.
1312 ///
1313 /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1314 /// to depend on the order of elements in the stream.
1315 ///
1316 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1317 ///
1318 /// # Example
1319 /// ```rust
1320 /// # use hydro_lang::prelude::*;
1321 /// # use futures::StreamExt;
1322 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1323 /// let tick = process.tick();
1324 /// let numbers = process
1325 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1326 /// .into_keyed();
1327 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1328 /// batch.reduce(q!(|acc, x| *acc += x)).entries().all_ticks()
1329 /// # }, |mut stream| async move {
1330 /// // (1, 5), (2, 7)
1331 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1332 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1333 /// # }));
1334 /// ```
1335 pub fn reduce<F: Fn(&mut V, V) + 'a>(
1336 self,
1337 comb: impl IntoQuotedMut<'a, F, L>,
1338 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1339 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1340
1341 KeyedSingleton::new(
1342 self.location.clone(),
1343 HydroNode::ReduceKeyed {
1344 f,
1345 input: Box::new(self.ir_node.into_inner()),
1346 metadata: self.location.new_node_metadata(KeyedSingleton::<
1347 K,
1348 V,
1349 L,
1350 B::WhenValueUnbounded,
1351 >::collection_kind()),
1352 },
1353 )
1354 }
1355
1356 /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark are automatically deleted.
1357 ///
1358 /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1359 /// to depend on the order of elements in the stream.
1360 ///
1361 /// # Example
1362 /// ```rust
1363 /// # use hydro_lang::prelude::*;
1364 /// # use futures::StreamExt;
1365 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1366 /// let tick = process.tick();
1367 /// let watermark = tick.singleton(q!(1));
1368 /// let numbers = process
1369 /// .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1370 /// .into_keyed();
1371 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1372 /// batch
1373 /// .reduce_watermark(watermark, q!(|acc, x| *acc += x))
1374 /// .entries()
1375 /// .all_ticks()
1376 /// # }, |mut stream| async move {
1377 /// // (2, 204)
1378 /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
1379 /// # }));
1380 /// ```
1381 pub fn reduce_watermark<O, F>(
1382 self,
1383 other: impl Into<Optional<O, Tick<L::Root>, Bounded>>,
1384 comb: impl IntoQuotedMut<'a, F, L>,
1385 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1386 where
1387 O: Clone,
1388 F: Fn(&mut V, V) + 'a,
1389 {
1390 let other: Optional<O, Tick<L::Root>, Bounded> = other.into();
1391 check_matching_location(&self.location.root(), other.location.outer());
1392 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1393
1394 KeyedSingleton::new(
1395 self.location.clone(),
1396 HydroNode::ReduceKeyedWatermark {
1397 f,
1398 input: Box::new(self.ir_node.into_inner()),
1399 watermark: Box::new(other.ir_node.into_inner()),
1400 metadata: self.location.new_node_metadata(KeyedSingleton::<
1401 K,
1402 V,
1403 L,
1404 B::WhenValueUnbounded,
1405 >::collection_kind()),
1406 },
1407 )
1408 }
1409}
1410
1411impl<'a, K, V, L, B: Boundedness, O: Ordering> KeyedStream<K, V, L, B, O, ExactlyOnce>
1412where
1413 K: Eq + Hash,
1414 L: Location<'a>,
1415{
1416 /// Like [`Stream::fold_commutative`], aggregates the values in each group via the `comb` closure.
1417 ///
1418 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1419 ///
1420 /// If the input and output value types are the same and do not require initialization then use
1421 /// [`KeyedStream::reduce_commutative`].
1422 ///
1423 /// # Example
1424 /// ```rust
1425 /// # use hydro_lang::prelude::*;
1426 /// # use futures::StreamExt;
1427 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1428 /// let tick = process.tick();
1429 /// let numbers = process
1430 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1431 /// .into_keyed();
1432 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1433 /// batch
1434 /// .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1435 /// .entries()
1436 /// .all_ticks()
1437 /// # }, |mut stream| async move {
1438 /// // (1, 5), (2, 7)
1439 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1440 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1441 /// # }));
1442 /// ```
1443 pub fn fold_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1444 self,
1445 init: impl IntoQuotedMut<'a, I, L>,
1446 comb: impl IntoQuotedMut<'a, F, L>,
1447 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1448 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1449 .fold(init, comb)
1450 }
1451
1452 /// Like [`Stream::reduce_commutative`], aggregates the values in each group via the `comb` closure.
1453 ///
1454 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1455 ///
1456 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative`].
1457 ///
1458 /// # Example
1459 /// ```rust
1460 /// # use hydro_lang::prelude::*;
1461 /// # use futures::StreamExt;
1462 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1463 /// let tick = process.tick();
1464 /// let numbers = process
1465 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1466 /// .into_keyed();
1467 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1468 /// batch
1469 /// .reduce_commutative(q!(|acc, x| *acc += x))
1470 /// .entries()
1471 /// .all_ticks()
1472 /// # }, |mut stream| async move {
1473 /// // (1, 5), (2, 7)
1474 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1475 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1476 /// # }));
1477 /// ```
1478 pub fn reduce_commutative<F: Fn(&mut V, V) + 'a>(
1479 self,
1480 comb: impl IntoQuotedMut<'a, F, L>,
1481 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1482 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1483 .reduce(comb)
1484 }
1485
1486 /// A special case of [`KeyedStream::reduce_commutative`] where tuples with keys less than the watermark are automatically deleted.
1487 ///
1488 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1489 ///
1490 /// # Example
1491 /// ```rust
1492 /// # use hydro_lang::prelude::*;
1493 /// # use futures::StreamExt;
1494 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1495 /// let tick = process.tick();
1496 /// let watermark = tick.singleton(q!(1));
1497 /// let numbers = process
1498 /// .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1499 /// .into_keyed();
1500 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1501 /// batch
1502 /// .reduce_watermark_commutative(watermark, q!(|acc, x| *acc += x))
1503 /// .entries()
1504 /// .all_ticks()
1505 /// # }, |mut stream| async move {
1506 /// // (2, 204)
1507 /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
1508 /// # }));
1509 /// ```
1510 pub fn reduce_watermark_commutative<O2, F>(
1511 self,
1512 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1513 comb: impl IntoQuotedMut<'a, F, L>,
1514 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1515 where
1516 O2: Clone,
1517 F: Fn(&mut V, V) + 'a,
1518 {
1519 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1520 .reduce_watermark(other, comb)
1521 }
1522}
1523
1524impl<'a, K, V, L, B: Boundedness, R: Retries> KeyedStream<K, V, L, B, TotalOrder, R>
1525where
1526 K: Eq + Hash,
1527 L: Location<'a>,
1528{
1529 /// Like [`Stream::fold_idempotent`], aggregates the values in each group via the `comb` closure.
1530 ///
1531 /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
1532 ///
1533 /// If the input and output value types are the same and do not require initialization then use
1534 /// [`KeyedStream::reduce_idempotent`].
1535 ///
1536 /// # Example
1537 /// ```rust
1538 /// # use hydro_lang::prelude::*;
1539 /// # use futures::StreamExt;
1540 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1541 /// let tick = process.tick();
1542 /// let numbers = process
1543 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1544 /// .into_keyed();
1545 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1546 /// batch
1547 /// .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1548 /// .entries()
1549 /// .all_ticks()
1550 /// # }, |mut stream| async move {
1551 /// // (1, false), (2, true)
1552 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1553 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1554 /// # }));
1555 /// ```
1556 pub fn fold_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1557 self,
1558 init: impl IntoQuotedMut<'a, I, L>,
1559 comb: impl IntoQuotedMut<'a, F, L>,
1560 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1561 self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1562 .fold(init, comb)
1563 }
1564
1565 /// Like [`Stream::reduce_idempotent`], aggregates the values in each group via the `comb` closure.
1566 ///
1567 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1568 ///
1569 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_idempotent`].
1570 ///
1571 /// # Example
1572 /// ```rust
1573 /// # use hydro_lang::prelude::*;
1574 /// # use futures::StreamExt;
1575 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1576 /// let tick = process.tick();
1577 /// let numbers = process
1578 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1579 /// .into_keyed();
1580 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1581 /// batch
1582 /// .reduce_idempotent(q!(|acc, x| *acc |= x))
1583 /// .entries()
1584 /// .all_ticks()
1585 /// # }, |mut stream| async move {
1586 /// // (1, false), (2, true)
1587 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1588 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1589 /// # }));
1590 /// ```
1591 pub fn reduce_idempotent<F: Fn(&mut V, V) + 'a>(
1592 self,
1593 comb: impl IntoQuotedMut<'a, F, L>,
1594 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1595 self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1596 .reduce(comb)
1597 }
1598
1599 /// A special case of [`KeyedStream::reduce_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1600 ///
1601 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1602 ///
1603 /// # Example
1604 /// ```rust
1605 /// # use hydro_lang::prelude::*;
1606 /// # use futures::StreamExt;
1607 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1608 /// let tick = process.tick();
1609 /// let watermark = tick.singleton(q!(1));
1610 /// let numbers = process
1611 /// .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1612 /// .into_keyed();
1613 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1614 /// batch
1615 /// .reduce_watermark_idempotent(watermark, q!(|acc, x| *acc |= x))
1616 /// .entries()
1617 /// .all_ticks()
1618 /// # }, |mut stream| async move {
1619 /// // (2, true)
1620 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1621 /// # }));
1622 /// ```
1623 pub fn reduce_watermark_idempotent<O2, F>(
1624 self,
1625 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1626 comb: impl IntoQuotedMut<'a, F, L>,
1627 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1628 where
1629 O2: Clone,
1630 F: Fn(&mut V, V) + 'a,
1631 {
1632 self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1633 .reduce_watermark(other, comb)
1634 }
1635}
1636
1637impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1638where
1639 K: Eq + Hash,
1640 L: Location<'a>,
1641{
1642 /// Like [`Stream::fold_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1643 ///
1644 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1645 /// as there may be non-deterministic duplicates.
1646 ///
1647 /// If the input and output value types are the same and do not require initialization then use
1648 /// [`KeyedStream::reduce_commutative_idempotent`].
1649 ///
1650 /// # Example
1651 /// ```rust
1652 /// # use hydro_lang::prelude::*;
1653 /// # use futures::StreamExt;
1654 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1655 /// let tick = process.tick();
1656 /// let numbers = process
1657 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1658 /// .into_keyed();
1659 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1660 /// batch
1661 /// .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1662 /// .entries()
1663 /// .all_ticks()
1664 /// # }, |mut stream| async move {
1665 /// // (1, false), (2, true)
1666 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1667 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1668 /// # }));
1669 /// ```
1670 pub fn fold_commutative_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1671 self,
1672 init: impl IntoQuotedMut<'a, I, L>,
1673 comb: impl IntoQuotedMut<'a, F, L>,
1674 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1675 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1676 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1677 .fold(init, comb)
1678 }
1679
1680 /// Like [`Stream::reduce_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1681 ///
1682 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1683 /// as there may be non-deterministic duplicates.
1684 ///
1685 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative_idempotent`].
1686 ///
1687 /// # Example
1688 /// ```rust
1689 /// # use hydro_lang::prelude::*;
1690 /// # use futures::StreamExt;
1691 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1692 /// let tick = process.tick();
1693 /// let numbers = process
1694 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1695 /// .into_keyed();
1696 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1697 /// batch
1698 /// .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1699 /// .entries()
1700 /// .all_ticks()
1701 /// # }, |mut stream| async move {
1702 /// // (1, false), (2, true)
1703 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1704 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1705 /// # }));
1706 /// ```
1707 pub fn reduce_commutative_idempotent<F: Fn(&mut V, V) + 'a>(
1708 self,
1709 comb: impl IntoQuotedMut<'a, F, L>,
1710 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1711 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1712 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1713 .reduce(comb)
1714 }
1715
1716 /// A special case of [`Stream::reduce_keyed_commutative_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1717 ///
1718 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1719 /// as there may be non-deterministic duplicates.
1720 ///
1721 /// # Example
1722 /// ```rust
1723 /// # use hydro_lang::prelude::*;
1724 /// # use futures::StreamExt;
1725 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1726 /// let tick = process.tick();
1727 /// let watermark = tick.singleton(q!(1));
1728 /// let numbers = process
1729 /// .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1730 /// .into_keyed();
1731 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1732 /// batch
1733 /// .reduce_watermark_commutative_idempotent(watermark, q!(|acc, x| *acc |= x))
1734 /// .entries()
1735 /// .all_ticks()
1736 /// # }, |mut stream| async move {
1737 /// // (2, true)
1738 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1739 /// # }));
1740 /// ```
1741 pub fn reduce_watermark_commutative_idempotent<O2, F>(
1742 self,
1743 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1744 comb: impl IntoQuotedMut<'a, F, L>,
1745 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1746 where
1747 O2: Clone,
1748 F: Fn(&mut V, V) + 'a,
1749 {
1750 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1751 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1752 .reduce_watermark(other, comb)
1753 }
1754
1755 /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1756 /// whose keys are not in the bounded stream.
1757 ///
1758 /// # Example
1759 /// ```rust
1760 /// # use hydro_lang::prelude::*;
1761 /// # use futures::StreamExt;
1762 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1763 /// let tick = process.tick();
1764 /// let keyed_stream = process
1765 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1766 /// .batch(&tick, nondet!(/** test */))
1767 /// .into_keyed();
1768 /// let keys_to_remove = process
1769 /// .source_iter(q!(vec![1, 2]))
1770 /// .batch(&tick, nondet!(/** test */));
1771 /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1772 /// # .entries()
1773 /// # }, |mut stream| async move {
1774 /// // { 3: ['c'], 4: ['d'] }
1775 /// # for w in vec![(3, 'c'), (4, 'd')] {
1776 /// # assert_eq!(stream.next().await.unwrap(), w);
1777 /// # }
1778 /// # }));
1779 /// ```
1780 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1781 self,
1782 other: Stream<K, L, Bounded, O2, R2>,
1783 ) -> Self {
1784 check_matching_location(&self.location, &other.location);
1785
1786 KeyedStream::new(
1787 self.location.clone(),
1788 HydroNode::AntiJoin {
1789 pos: Box::new(self.ir_node.into_inner()),
1790 neg: Box::new(other.ir_node.into_inner()),
1791 metadata: self.location.new_node_metadata(Self::collection_kind()),
1792 },
1793 )
1794 }
1795}
1796
1797impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1798where
1799 L: Location<'a>,
1800{
1801 /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
1802 /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
1803 ///
1804 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1805 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1806 /// argument that declares where the stream will be atomically processed. Batching a stream into
1807 /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
1808 /// [`Tick`] will introduce asynchrony.
1809 pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
1810 let out_location = Atomic { tick: tick.clone() };
1811 KeyedStream::new(
1812 out_location.clone(),
1813 HydroNode::BeginAtomic {
1814 inner: Box::new(self.ir_node.into_inner()),
1815 metadata: out_location
1816 .new_node_metadata(KeyedStream::<K, V, Atomic<L>, B, O, R>::collection_kind()),
1817 },
1818 )
1819 }
1820
1821 /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
1822 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1823 /// the order of the input.
1824 ///
1825 /// # Non-Determinism
1826 /// The batch boundaries are non-deterministic and may change across executions.
1827 pub fn batch(
1828 self,
1829 tick: &Tick<L>,
1830 nondet: NonDet,
1831 ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1832 let _ = nondet;
1833 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1834 KeyedStream::new(
1835 tick.clone(),
1836 HydroNode::Batch {
1837 inner: Box::new(self.ir_node.into_inner()),
1838 metadata: tick.new_node_metadata(
1839 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
1840 ),
1841 },
1842 )
1843 }
1844}
1845
1846impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
1847where
1848 L: Location<'a> + NoTick,
1849{
1850 /// Returns a keyed stream corresponding to the latest batch of elements being atomically
1851 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
1852 /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
1853 /// used to create the atomic section.
1854 ///
1855 /// # Non-Determinism
1856 /// The batch boundaries are non-deterministic and may change across executions.
1857 pub fn batch_atomic(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1858 let _ = nondet;
1859 KeyedStream::new(
1860 self.location.clone().tick,
1861 HydroNode::Batch {
1862 inner: Box::new(self.ir_node.into_inner()),
1863 metadata: self.location.tick.new_node_metadata(KeyedStream::<
1864 K,
1865 V,
1866 Tick<L>,
1867 Bounded,
1868 O,
1869 R,
1870 >::collection_kind(
1871 )),
1872 },
1873 )
1874 }
1875
1876 /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
1877 /// See [`KeyedStream::atomic`] for more details.
1878 pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
1879 KeyedStream::new(
1880 self.location.tick.l.clone(),
1881 HydroNode::EndAtomic {
1882 inner: Box::new(self.ir_node.into_inner()),
1883 metadata: self
1884 .location
1885 .tick
1886 .l
1887 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
1888 },
1889 )
1890 }
1891}
1892
1893impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, L, Bounded, O, R>
1894where
1895 L: Location<'a>,
1896{
1897 /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
1898 /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
1899 /// is only present in one of the inputs, its values are passed through as-is). The output has
1900 /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
1901 ///
1902 /// Currently, both input streams must be [`Bounded`]. This operator will block
1903 /// on the first stream until all its elements are available. In a future version,
1904 /// we will relax the requirement on the `other` stream.
1905 ///
1906 /// # Example
1907 /// ```rust
1908 /// # use hydro_lang::prelude::*;
1909 /// # use futures::StreamExt;
1910 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1911 /// let tick = process.tick();
1912 /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
1913 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1914 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1915 /// # .entries()
1916 /// # }, |mut stream| async move {
1917 /// // { 0: [2, 1], 1: [4, 3] }
1918 /// # for w in vec![(0, 2), (1, 4), (0, 1), (1, 3)] {
1919 /// # assert_eq!(stream.next().await.unwrap(), w);
1920 /// # }
1921 /// # }));
1922 /// ```
1923 pub fn chain<O2: Ordering, R2: Retries>(
1924 self,
1925 other: KeyedStream<K, V, L, Bounded, O2, R2>,
1926 ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1927 where
1928 O: MinOrder<O2>,
1929 R: MinRetries<R2>,
1930 {
1931 check_matching_location(&self.location, &other.location);
1932
1933 KeyedStream::new(
1934 self.location.clone(),
1935 HydroNode::Chain {
1936 first: Box::new(self.ir_node.into_inner()),
1937 second: Box::new(other.ir_node.into_inner()),
1938 metadata: self.location.new_node_metadata(KeyedStream::<
1939 K,
1940 V,
1941 L,
1942 Bounded,
1943 <O as MinOrder<O2>>::Min,
1944 <R as MinRetries<R2>>::Min,
1945 >::collection_kind()),
1946 },
1947 )
1948 }
1949}
1950
1951impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
1952where
1953 L: Location<'a>,
1954{
1955 /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
1956 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
1957 /// each key.
1958 pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
1959 KeyedStream::new(
1960 self.location.outer().clone(),
1961 HydroNode::YieldConcat {
1962 inner: Box::new(self.ir_node.into_inner()),
1963 metadata: self.location.outer().new_node_metadata(KeyedStream::<
1964 K,
1965 V,
1966 L,
1967 Unbounded,
1968 O,
1969 R,
1970 >::collection_kind(
1971 )),
1972 },
1973 )
1974 }
1975
1976 /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
1977 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
1978 /// each key.
1979 ///
1980 /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
1981 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1982 /// stream's [`Tick`] context.
1983 pub fn all_ticks_atomic(self) -> KeyedStream<K, V, Atomic<L>, Unbounded, O, R> {
1984 let out_location = Atomic {
1985 tick: self.location.clone(),
1986 };
1987
1988 KeyedStream::new(
1989 out_location.clone(),
1990 HydroNode::YieldConcat {
1991 inner: Box::new(self.ir_node.into_inner()),
1992 metadata: out_location.new_node_metadata(KeyedStream::<
1993 K,
1994 V,
1995 Atomic<L>,
1996 Unbounded,
1997 O,
1998 R,
1999 >::collection_kind()),
2000 },
2001 )
2002 }
2003
2004 /// Shifts the entries in `self` to the **next tick**, so that the returned keyed stream at
2005 /// tick `T` always has the entries of `self` at tick `T - 1`.
2006 ///
2007 /// At tick `0`, the output keyed stream is empty, since there is no previous tick.
2008 ///
2009 /// This operator enables stateful iterative processing with ticks, by sending data from one
2010 /// tick to the next. For example, you can use it to combine inputs across consecutive batches.
2011 ///
2012 /// # Example
2013 /// ```rust
2014 /// # use hydro_lang::prelude::*;
2015 /// # use futures::StreamExt;
2016 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2017 /// let tick = process.tick();
2018 /// # // ticks are lazy by default, forces the second tick to run
2019 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2020 /// # let batch_first_tick = process
2021 /// # .source_iter(q!(vec![(1, 2), (1, 3)]))
2022 /// # .batch(&tick, nondet!(/** test */))
2023 /// # .into_keyed();
2024 /// # let batch_second_tick = process
2025 /// # .source_iter(q!(vec![(1, 4), (2, 5)]))
2026 /// # .batch(&tick, nondet!(/** test */))
2027 /// # .defer_tick()
2028 /// # .into_keyed(); // appears on the second tick
2029 /// let changes_across_ticks = // { 1: [2, 3] } (first tick), { 1: [4], 2: [5] } (second tick)
2030 /// # batch_first_tick.chain(batch_second_tick);
2031 /// changes_across_ticks.clone().defer_tick().chain( // from the previous tick
2032 /// changes_across_ticks // from the current tick
2033 /// )
2034 /// # .entries().all_ticks()
2035 /// # }, |mut stream| async move {
2036 /// // { 1: [2, 3] } (first tick), { 1: [2, 3, 4], 2: [5] } (second tick), { 1: [4], 2: [5] } (third tick)
2037 /// # for w in vec![(1, 2), (1, 3), (1, 2), (1, 3), (1, 4), (2, 5), (1, 4), (2, 5)] {
2038 /// # assert_eq!(stream.next().await.unwrap(), w);
2039 /// # }
2040 /// # }));
2041 /// ```
2042 pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2043 KeyedStream::new(
2044 self.location.clone(),
2045 HydroNode::DeferTick {
2046 input: Box::new(self.ir_node.into_inner()),
2047 metadata: self.location.new_node_metadata(KeyedStream::<
2048 K,
2049 V,
2050 Tick<L>,
2051 Bounded,
2052 O,
2053 R,
2054 >::collection_kind()),
2055 },
2056 )
2057 }
2058}
2059
2060#[cfg(test)]
2061mod tests {
2062 use futures::{SinkExt, StreamExt};
2063 use hydro_deploy::Deployment;
2064 use stageleft::q;
2065
2066 use crate::compile::builder::FlowBuilder;
2067 use crate::live_collections::stream::ExactlyOnce;
2068 use crate::location::Location;
2069 use crate::nondet::nondet;
2070
2071 #[tokio::test]
2072 async fn reduce_watermark_filter() {
2073 let mut deployment = Deployment::new();
2074
2075 let flow = FlowBuilder::new();
2076 let node = flow.process::<()>();
2077 let external = flow.external::<()>();
2078
2079 let node_tick = node.tick();
2080 let watermark = node_tick.singleton(q!(1));
2081
2082 let sum = node
2083 .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2084 .into_keyed()
2085 .reduce_watermark(
2086 watermark,
2087 q!(|acc, v| {
2088 *acc += v;
2089 }),
2090 )
2091 .snapshot(&node_tick, nondet!(/** test */))
2092 .entries()
2093 .all_ticks()
2094 .send_bincode_external(&external);
2095
2096 let nodes = flow
2097 .with_process(&node, deployment.Localhost())
2098 .with_external(&external, deployment.Localhost())
2099 .deploy(&mut deployment);
2100
2101 deployment.deploy().await.unwrap();
2102
2103 let mut out = nodes.connect(sum).await;
2104
2105 deployment.start().await.unwrap();
2106
2107 assert_eq!(out.next().await.unwrap(), (2, 204));
2108 }
2109
2110 #[tokio::test]
2111 async fn reduce_watermark_garbage_collect() {
2112 let mut deployment = Deployment::new();
2113
2114 let flow = FlowBuilder::new();
2115 let node = flow.process::<()>();
2116 let external = flow.external::<()>();
2117 let (tick_send, tick_trigger) =
2118 node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
2119
2120 let node_tick = node.tick();
2121 let (watermark_complete_cycle, watermark) =
2122 node_tick.cycle_with_initial(node_tick.singleton(q!(1)));
2123 let next_watermark = watermark.clone().map(q!(|v| v + 1));
2124 watermark_complete_cycle.complete_next_tick(next_watermark);
2125
2126 let tick_triggered_input = node
2127 .source_iter(q!([(3, 103)]))
2128 .batch(&node_tick, nondet!(/** test */))
2129 .filter_if_some(
2130 tick_trigger
2131 .clone()
2132 .batch(&node_tick, nondet!(/** test */))
2133 .first(),
2134 )
2135 .all_ticks();
2136
2137 let sum = node
2138 .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2139 .interleave(tick_triggered_input)
2140 .into_keyed()
2141 .reduce_watermark_commutative(
2142 watermark,
2143 q!(|acc, v| {
2144 *acc += v;
2145 }),
2146 )
2147 .snapshot(&node_tick, nondet!(/** test */))
2148 .entries()
2149 .all_ticks()
2150 .send_bincode_external(&external);
2151
2152 let nodes = flow
2153 .with_default_optimize()
2154 .with_process(&node, deployment.Localhost())
2155 .with_external(&external, deployment.Localhost())
2156 .deploy(&mut deployment);
2157
2158 deployment.deploy().await.unwrap();
2159
2160 let mut tick_send = nodes.connect(tick_send).await;
2161 let mut out_recv = nodes.connect(sum).await;
2162
2163 deployment.start().await.unwrap();
2164
2165 assert_eq!(out_recv.next().await.unwrap(), (2, 204));
2166
2167 tick_send.send(()).await.unwrap();
2168
2169 assert_eq!(out_recv.next().await.unwrap(), (3, 103));
2170 }
2171}