hydro_lang/live_collections/keyed_stream/mod.rs
1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, Unbounded};
13use super::keyed_singleton::KeyedSingleton;
14use super::optional::Optional;
15use super::stream::{ExactlyOnce, MinOrder, MinRetries, NoOrder, Stream, TotalOrder};
16use crate::compile::ir::{
17 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
18};
19#[cfg(stageleft_runtime)]
20use crate::forward_handle::{CycleCollection, ReceiverComplete};
21use crate::forward_handle::{ForwardRef, TickCycle};
22use crate::live_collections::stream::{Ordering, Retries};
23#[cfg(stageleft_runtime)]
24use crate::location::dynamic::{DynLocation, LocationId};
25use crate::location::tick::DeferTick;
26use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
27use crate::manual_expr::ManualExpr;
28use crate::nondet::{NonDet, nondet};
29
30pub mod networking;
31
32/// Streaming elements of type `V` grouped by a key of type `K`.
33///
34/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
35/// order of keys is non-deterministic but the order *within* each group may be deterministic.
36///
37/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
38/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
39/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
40///
41/// Type Parameters:
42/// - `K`: the type of the key for each group
43/// - `V`: the type of the elements inside each group
44/// - `Loc`: the [`Location`] where the keyed stream is materialized
45/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
46/// - `Order`: tracks whether the elements within each group have deterministic order
47/// ([`TotalOrder`]) or not ([`NoOrder`])
48/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
49/// ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
50pub struct KeyedStream<
51 K,
52 V,
53 Loc,
54 Bound: Boundedness,
55 Order: Ordering = TotalOrder,
56 Retry: Retries = ExactlyOnce,
57> {
58 pub(crate) location: Loc,
59 pub(crate) ir_node: RefCell<HydroNode>,
60
61 _phantom: PhantomData<(K, V, Loc, Bound, Order, Retry)>,
62}
63
64impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
65 for KeyedStream<K, V, L, B, NoOrder, R>
66where
67 L: Location<'a>,
68{
69 fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
70 KeyedStream {
71 location: stream.location,
72 ir_node: stream.ir_node,
73 _phantom: PhantomData,
74 }
75 }
76}
77
78impl<'a, K, V, L, O: Ordering, R: Retries> DeferTick for KeyedStream<K, V, Tick<L>, Bounded, O, R>
79where
80 L: Location<'a>,
81{
82 fn defer_tick(self) -> Self {
83 KeyedStream::defer_tick(self)
84 }
85}
86
87impl<'a, K, V, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
88 for KeyedStream<K, V, Tick<L>, Bounded, O, R>
89where
90 L: Location<'a>,
91{
92 type Location = Tick<L>;
93
94 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
95 KeyedStream {
96 location: location.clone(),
97 ir_node: RefCell::new(HydroNode::CycleSource {
98 ident,
99 metadata: location.new_node_metadata(
100 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
101 ),
102 }),
103 _phantom: PhantomData,
104 }
105 }
106}
107
108impl<'a, K, V, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
109 for KeyedStream<K, V, Tick<L>, Bounded, O, R>
110where
111 L: Location<'a>,
112{
113 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
114 assert_eq!(
115 Location::id(&self.location),
116 expected_location,
117 "locations do not match"
118 );
119
120 self.location
121 .flow_state()
122 .borrow_mut()
123 .push_root(HydroRoot::CycleSink {
124 ident,
125 input: Box::new(self.ir_node.into_inner()),
126 op_metadata: HydroIrOpMetadata::new(),
127 });
128 }
129}
130
131impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
132 for KeyedStream<K, V, L, B, O, R>
133where
134 L: Location<'a> + NoTick,
135{
136 type Location = L;
137
138 fn create_source(ident: syn::Ident, location: L) -> Self {
139 KeyedStream {
140 location: location.clone(),
141 ir_node: RefCell::new(HydroNode::CycleSource {
142 ident,
143 metadata: location
144 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
145 }),
146 _phantom: PhantomData,
147 }
148 }
149}
150
151impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
152 for KeyedStream<K, V, L, B, O, R>
153where
154 L: Location<'a> + NoTick,
155{
156 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
157 assert_eq!(
158 Location::id(&self.location),
159 expected_location,
160 "locations do not match"
161 );
162 self.location
163 .flow_state()
164 .borrow_mut()
165 .push_root(HydroRoot::CycleSink {
166 ident,
167 input: Box::new(self.ir_node.into_inner()),
168 op_metadata: HydroIrOpMetadata::new(),
169 });
170 }
171}
172
173impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
174 Clone for KeyedStream<K, V, Loc, Bound, Order, R>
175{
176 fn clone(&self) -> Self {
177 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
178 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
179 *self.ir_node.borrow_mut() = HydroNode::Tee {
180 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
181 metadata: self.location.new_node_metadata(Self::collection_kind()),
182 };
183 }
184
185 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
186 KeyedStream {
187 location: self.location.clone(),
188 ir_node: HydroNode::Tee {
189 inner: TeeNode(inner.0.clone()),
190 metadata: metadata.clone(),
191 }
192 .into(),
193 _phantom: PhantomData,
194 }
195 } else {
196 unreachable!()
197 }
198 }
199}
200
201impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
202 KeyedStream<K, V, L, B, O, R>
203{
204 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
205 debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
206 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
207
208 KeyedStream {
209 location,
210 ir_node: RefCell::new(ir_node),
211 _phantom: PhantomData,
212 }
213 }
214
215 /// Returns the [`CollectionKind`] corresponding to this type.
216 pub fn collection_kind() -> CollectionKind {
217 CollectionKind::KeyedStream {
218 bound: B::BOUND_KIND,
219 value_order: O::ORDERING_KIND,
220 value_retry: R::RETRIES_KIND,
221 key_type: stageleft::quote_type::<K>().into(),
222 value_type: stageleft::quote_type::<V>().into(),
223 }
224 }
225
226 /// Returns the [`Location`] where this keyed stream is being materialized.
227 pub fn location(&self) -> &L {
228 &self.location
229 }
230
231 /// Explicitly "casts" the keyed stream to a type with a different ordering
232 /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
233 /// by the type-system.
234 ///
235 /// # Non-Determinism
236 /// This function is used as an escape hatch, and any mistakes in the
237 /// provided ordering guarantee will propagate into the guarantees
238 /// for the rest of the program.
239 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
240 if O::ORDERING_KIND == O2::ORDERING_KIND {
241 KeyedStream::new(self.location, self.ir_node.into_inner())
242 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
243 // We can always weaken the ordering guarantee
244 KeyedStream::new(
245 self.location.clone(),
246 HydroNode::Cast {
247 inner: Box::new(self.ir_node.into_inner()),
248 metadata: self
249 .location
250 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
251 },
252 )
253 } else {
254 KeyedStream::new(
255 self.location.clone(),
256 HydroNode::ObserveNonDet {
257 inner: Box::new(self.ir_node.into_inner()),
258 trusted: false,
259 metadata: self
260 .location
261 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
262 },
263 )
264 }
265 }
266
267 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
268 /// which is always safe because that is the weakest possible guarantee.
269 pub fn weakest_ordering(self) -> KeyedStream<K, V, L, B, NoOrder, R> {
270 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
271 self.assume_ordering::<NoOrder>(nondet)
272 }
273
274 /// Explicitly "casts" the keyed stream to a type with a different retries
275 /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
276 /// be proven by the type-system.
277 ///
278 /// # Non-Determinism
279 /// This function is used as an escape hatch, and any mistakes in the
280 /// provided retries guarantee will propagate into the guarantees
281 /// for the rest of the program.
282 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
283 if R::RETRIES_KIND == R2::RETRIES_KIND {
284 KeyedStream::new(self.location, self.ir_node.into_inner())
285 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
286 // We can always weaken the retries guarantee
287 KeyedStream::new(
288 self.location.clone(),
289 HydroNode::Cast {
290 inner: Box::new(self.ir_node.into_inner()),
291 metadata: self
292 .location
293 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
294 },
295 )
296 } else {
297 KeyedStream::new(
298 self.location.clone(),
299 HydroNode::ObserveNonDet {
300 inner: Box::new(self.ir_node.into_inner()),
301 trusted: false,
302 metadata: self
303 .location
304 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
305 },
306 )
307 }
308 }
309
310 /// Flattens the keyed stream into an unordered stream of key-value pairs.
311 ///
312 /// # Example
313 /// ```rust
314 /// # use hydro_lang::prelude::*;
315 /// # use futures::StreamExt;
316 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
317 /// process
318 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
319 /// .into_keyed()
320 /// .entries()
321 /// # }, |mut stream| async move {
322 /// // (1, 2), (1, 3), (2, 4) in any order
323 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
324 /// # assert_eq!(stream.next().await.unwrap(), w);
325 /// # }
326 /// # }));
327 /// ```
328 pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
329 Stream::new(
330 self.location.clone(),
331 HydroNode::Cast {
332 inner: Box::new(self.ir_node.into_inner()),
333 metadata: self
334 .location
335 .new_node_metadata(Stream::<(K, V), L, B, NoOrder, R>::collection_kind()),
336 },
337 )
338 }
339
340 /// Flattens the keyed stream into an unordered stream of only the values.
341 ///
342 /// # Example
343 /// ```rust
344 /// # use hydro_lang::prelude::*;
345 /// # use futures::StreamExt;
346 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
347 /// process
348 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
349 /// .into_keyed()
350 /// .values()
351 /// # }, |mut stream| async move {
352 /// // 2, 3, 4 in any order
353 /// # for w in vec![2, 3, 4] {
354 /// # assert_eq!(stream.next().await.unwrap(), w);
355 /// # }
356 /// # }));
357 /// ```
358 pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
359 self.entries().map(q!(|(_, v)| v))
360 }
361
362 /// Transforms each value by invoking `f` on each element, with keys staying the same
363 /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
364 ///
365 /// If you do not want to modify the stream and instead only want to view
366 /// each item use [`KeyedStream::inspect`] instead.
367 ///
368 /// # Example
369 /// ```rust
370 /// # use hydro_lang::prelude::*;
371 /// # use futures::StreamExt;
372 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
373 /// process
374 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
375 /// .into_keyed()
376 /// .map(q!(|v| v + 1))
377 /// # .entries()
378 /// # }, |mut stream| async move {
379 /// // { 1: [3, 4], 2: [5] }
380 /// # for w in vec![(1, 3), (1, 4), (2, 5)] {
381 /// # assert_eq!(stream.next().await.unwrap(), w);
382 /// # }
383 /// # }));
384 /// ```
385 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
386 where
387 F: Fn(V) -> U + 'a,
388 {
389 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
390 let map_f = q!({
391 let orig = f;
392 move |(k, v)| (k, orig(v))
393 })
394 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
395 .into();
396
397 KeyedStream::new(
398 self.location.clone(),
399 HydroNode::Map {
400 f: map_f,
401 input: Box::new(self.ir_node.into_inner()),
402 metadata: self
403 .location
404 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
405 },
406 )
407 }
408
409 /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
410 /// re-grouped even they are tuples; instead they will be grouped under the original key.
411 ///
412 /// If you do not want to modify the stream and instead only want to view
413 /// each item use [`KeyedStream::inspect_with_key`] instead.
414 ///
415 /// # Example
416 /// ```rust
417 /// # use hydro_lang::prelude::*;
418 /// # use futures::StreamExt;
419 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
420 /// process
421 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
422 /// .into_keyed()
423 /// .map_with_key(q!(|(k, v)| k + v))
424 /// # .entries()
425 /// # }, |mut stream| async move {
426 /// // { 1: [3, 4], 2: [6] }
427 /// # for w in vec![(1, 3), (1, 4), (2, 6)] {
428 /// # assert_eq!(stream.next().await.unwrap(), w);
429 /// # }
430 /// # }));
431 /// ```
432 pub fn map_with_key<U, F>(
433 self,
434 f: impl IntoQuotedMut<'a, F, L> + Copy,
435 ) -> KeyedStream<K, U, L, B, O, R>
436 where
437 F: Fn((K, V)) -> U + 'a,
438 K: Clone,
439 {
440 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
441 let map_f = q!({
442 let orig = f;
443 move |(k, v)| {
444 let out = orig((Clone::clone(&k), v));
445 (k, out)
446 }
447 })
448 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
449 .into();
450
451 KeyedStream::new(
452 self.location.clone(),
453 HydroNode::Map {
454 f: map_f,
455 input: Box::new(self.ir_node.into_inner()),
456 metadata: self
457 .location
458 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
459 },
460 )
461 }
462
463 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
464 /// `f`, preserving the order of the elements within the group.
465 ///
466 /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
467 /// not modify or take ownership of the values. If you need to modify the values while filtering
468 /// use [`KeyedStream::filter_map`] instead.
469 ///
470 /// # Example
471 /// ```rust
472 /// # use hydro_lang::prelude::*;
473 /// # use futures::StreamExt;
474 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
475 /// process
476 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
477 /// .into_keyed()
478 /// .filter(q!(|&x| x > 2))
479 /// # .entries()
480 /// # }, |mut stream| async move {
481 /// // { 1: [3], 2: [4] }
482 /// # for w in vec![(1, 3), (2, 4)] {
483 /// # assert_eq!(stream.next().await.unwrap(), w);
484 /// # }
485 /// # }));
486 /// ```
487 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
488 where
489 F: Fn(&V) -> bool + 'a,
490 {
491 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
492 let filter_f = q!({
493 let orig = f;
494 move |t: &(_, _)| orig(&t.1)
495 })
496 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
497 .into();
498
499 KeyedStream::new(
500 self.location.clone(),
501 HydroNode::Filter {
502 f: filter_f,
503 input: Box::new(self.ir_node.into_inner()),
504 metadata: self.location.new_node_metadata(Self::collection_kind()),
505 },
506 )
507 }
508
509 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
510 /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
511 ///
512 /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
513 /// not modify or take ownership of the values. If you need to modify the values while filtering
514 /// use [`KeyedStream::filter_map_with_key`] instead.
515 ///
516 /// # Example
517 /// ```rust
518 /// # use hydro_lang::prelude::*;
519 /// # use futures::StreamExt;
520 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
521 /// process
522 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
523 /// .into_keyed()
524 /// .filter_with_key(q!(|&(k, v)| v - k == 2))
525 /// # .entries()
526 /// # }, |mut stream| async move {
527 /// // { 1: [3], 2: [4] }
528 /// # for w in vec![(1, 3), (2, 4)] {
529 /// # assert_eq!(stream.next().await.unwrap(), w);
530 /// # }
531 /// # }));
532 /// ```
533 pub fn filter_with_key<F>(
534 self,
535 f: impl IntoQuotedMut<'a, F, L> + Copy,
536 ) -> KeyedStream<K, V, L, B, O, R>
537 where
538 F: Fn(&(K, V)) -> bool + 'a,
539 {
540 let filter_f = f
541 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
542 .into();
543
544 KeyedStream::new(
545 self.location.clone(),
546 HydroNode::Filter {
547 f: filter_f,
548 input: Box::new(self.ir_node.into_inner()),
549 metadata: self.location.new_node_metadata(Self::collection_kind()),
550 },
551 )
552 }
553
554 /// An operator that both filters and maps each value, with keys staying the same.
555 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
556 /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
557 ///
558 /// # Example
559 /// ```rust
560 /// # use hydro_lang::prelude::*;
561 /// # use futures::StreamExt;
562 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
563 /// process
564 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
565 /// .into_keyed()
566 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
567 /// # .entries()
568 /// # }, |mut stream| async move {
569 /// // { 1: [2], 2: [4] }
570 /// # for w in vec![(1, 2), (2, 4)] {
571 /// # assert_eq!(stream.next().await.unwrap(), w);
572 /// # }
573 /// # }));
574 /// ```
575 pub fn filter_map<U, F>(
576 self,
577 f: impl IntoQuotedMut<'a, F, L> + Copy,
578 ) -> KeyedStream<K, U, L, B, O, R>
579 where
580 F: Fn(V) -> Option<U> + 'a,
581 {
582 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
583 let filter_map_f = q!({
584 let orig = f;
585 move |(k, v)| orig(v).map(|o| (k, o))
586 })
587 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
588 .into();
589
590 KeyedStream::new(
591 self.location.clone(),
592 HydroNode::FilterMap {
593 f: filter_map_f,
594 input: Box::new(self.ir_node.into_inner()),
595 metadata: self
596 .location
597 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
598 },
599 )
600 }
601
602 /// An operator that both filters and maps each key-value pair. The resulting values are **not**
603 /// re-grouped even they are tuples; instead they will be grouped under the original key.
604 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
605 ///
606 /// # Example
607 /// ```rust
608 /// # use hydro_lang::prelude::*;
609 /// # use futures::StreamExt;
610 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
611 /// process
612 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
613 /// .into_keyed()
614 /// .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
615 /// # .entries()
616 /// # }, |mut stream| async move {
617 /// // { 2: [2] }
618 /// # for w in vec![(2, 2)] {
619 /// # assert_eq!(stream.next().await.unwrap(), w);
620 /// # }
621 /// # }));
622 /// ```
623 pub fn filter_map_with_key<U, F>(
624 self,
625 f: impl IntoQuotedMut<'a, F, L> + Copy,
626 ) -> KeyedStream<K, U, L, B, O, R>
627 where
628 F: Fn((K, V)) -> Option<U> + 'a,
629 K: Clone,
630 {
631 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
632 let filter_map_f = q!({
633 let orig = f;
634 move |(k, v)| {
635 let out = orig((Clone::clone(&k), v));
636 out.map(|o| (k, o))
637 }
638 })
639 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
640 .into();
641
642 KeyedStream::new(
643 self.location.clone(),
644 HydroNode::FilterMap {
645 f: filter_map_f,
646 input: Box::new(self.ir_node.into_inner()),
647 metadata: self
648 .location
649 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
650 },
651 )
652 }
653
654 /// For each value `v` in each group, transform `v` using `f` and then treat the
655 /// result as an [`Iterator`] to produce values one by one within the same group.
656 /// The implementation for [`Iterator`] for the output type `I` must produce items
657 /// in a **deterministic** order.
658 ///
659 /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
660 /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
661 ///
662 /// # Example
663 /// ```rust
664 /// # use hydro_lang::prelude::*;
665 /// # use futures::StreamExt;
666 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
667 /// process
668 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
669 /// .into_keyed()
670 /// .flat_map_ordered(q!(|x| x))
671 /// # .entries()
672 /// # }, |mut stream| async move {
673 /// // { 1: [2, 3, 4], 2: [5, 6] }
674 /// # for w in vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)] {
675 /// # assert_eq!(stream.next().await.unwrap(), w);
676 /// # }
677 /// # }));
678 /// ```
679 pub fn flat_map_ordered<U, I, F>(
680 self,
681 f: impl IntoQuotedMut<'a, F, L> + Copy,
682 ) -> KeyedStream<K, U, L, B, O, R>
683 where
684 I: IntoIterator<Item = U>,
685 F: Fn(V) -> I + 'a,
686 K: Clone,
687 {
688 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
689 let flat_map_f = q!({
690 let orig = f;
691 move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
692 })
693 .splice_fn1_ctx::<(K, V), _>(&self.location)
694 .into();
695
696 KeyedStream::new(
697 self.location.clone(),
698 HydroNode::FlatMap {
699 f: flat_map_f,
700 input: Box::new(self.ir_node.into_inner()),
701 metadata: self
702 .location
703 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
704 },
705 )
706 }
707
708 /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
709 /// for the output type `I` to produce items in any order.
710 ///
711 /// # Example
712 /// ```rust
713 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
714 /// # use futures::StreamExt;
715 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
716 /// process
717 /// .source_iter(q!(vec![
718 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
719 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
720 /// ]))
721 /// .into_keyed()
722 /// .flat_map_unordered(q!(|x| x))
723 /// # .entries()
724 /// # }, |mut stream| async move {
725 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
726 /// # let mut results = Vec::new();
727 /// # for _ in 0..4 {
728 /// # results.push(stream.next().await.unwrap());
729 /// # }
730 /// # results.sort();
731 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
732 /// # }));
733 /// ```
734 pub fn flat_map_unordered<U, I, F>(
735 self,
736 f: impl IntoQuotedMut<'a, F, L> + Copy,
737 ) -> KeyedStream<K, U, L, B, NoOrder, R>
738 where
739 I: IntoIterator<Item = U>,
740 F: Fn(V) -> I + 'a,
741 K: Clone,
742 {
743 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
744 let flat_map_f = q!({
745 let orig = f;
746 move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
747 })
748 .splice_fn1_ctx::<(K, V), _>(&self.location)
749 .into();
750
751 KeyedStream::new(
752 self.location.clone(),
753 HydroNode::FlatMap {
754 f: flat_map_f,
755 input: Box::new(self.ir_node.into_inner()),
756 metadata: self
757 .location
758 .new_node_metadata(KeyedStream::<K, U, L, B, NoOrder, R>::collection_kind()),
759 },
760 )
761 }
762
763 /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
764 /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
765 /// items in a **deterministic** order.
766 ///
767 /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
768 /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
769 ///
770 /// # Example
771 /// ```rust
772 /// # use hydro_lang::prelude::*;
773 /// # use futures::StreamExt;
774 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
775 /// process
776 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
777 /// .into_keyed()
778 /// .flatten_ordered()
779 /// # .entries()
780 /// # }, |mut stream| async move {
781 /// // { 1: [2, 3, 4], 2: [5, 6] }
782 /// # for w in vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)] {
783 /// # assert_eq!(stream.next().await.unwrap(), w);
784 /// # }
785 /// # }));
786 /// ```
787 pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
788 where
789 V: IntoIterator<Item = U>,
790 K: Clone,
791 {
792 self.flat_map_ordered(q!(|d| d))
793 }
794
795 /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
796 /// for the value type `V` to produce items in any order.
797 ///
798 /// # Example
799 /// ```rust
800 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
801 /// # use futures::StreamExt;
802 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
803 /// process
804 /// .source_iter(q!(vec![
805 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
806 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
807 /// ]))
808 /// .into_keyed()
809 /// .flatten_unordered()
810 /// # .entries()
811 /// # }, |mut stream| async move {
812 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
813 /// # let mut results = Vec::new();
814 /// # for _ in 0..4 {
815 /// # results.push(stream.next().await.unwrap());
816 /// # }
817 /// # results.sort();
818 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
819 /// # }));
820 /// ```
821 pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
822 where
823 V: IntoIterator<Item = U>,
824 K: Clone,
825 {
826 self.flat_map_unordered(q!(|d| d))
827 }
828
829 /// An operator which allows you to "inspect" each element of a stream without
830 /// modifying it. The closure `f` is called on a reference to each value. This is
831 /// mainly useful for debugging, and should not be used to generate side-effects.
832 ///
833 /// # Example
834 /// ```rust
835 /// # use hydro_lang::prelude::*;
836 /// # use futures::StreamExt;
837 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
838 /// process
839 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
840 /// .into_keyed()
841 /// .inspect(q!(|v| println!("{}", v)))
842 /// # .entries()
843 /// # }, |mut stream| async move {
844 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
845 /// # assert_eq!(stream.next().await.unwrap(), w);
846 /// # }
847 /// # }));
848 /// ```
849 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
850 where
851 F: Fn(&V) + 'a,
852 {
853 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
854 let inspect_f = q!({
855 let orig = f;
856 move |t: &(_, _)| orig(&t.1)
857 })
858 .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
859 .into();
860
861 KeyedStream::new(
862 self.location.clone(),
863 HydroNode::Inspect {
864 f: inspect_f,
865 input: Box::new(self.ir_node.into_inner()),
866 metadata: self.location.new_node_metadata(Self::collection_kind()),
867 },
868 )
869 }
870
871 /// An operator which allows you to "inspect" each element of a stream without
872 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
873 /// mainly useful for debugging, and should not be used to generate side-effects.
874 ///
875 /// # Example
876 /// ```rust
877 /// # use hydro_lang::prelude::*;
878 /// # use futures::StreamExt;
879 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
880 /// process
881 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
882 /// .into_keyed()
883 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
884 /// # .entries()
885 /// # }, |mut stream| async move {
886 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
887 /// # assert_eq!(stream.next().await.unwrap(), w);
888 /// # }
889 /// # }));
890 /// ```
891 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
892 where
893 F: Fn(&(K, V)) + 'a,
894 {
895 let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
896
897 KeyedStream::new(
898 self.location.clone(),
899 HydroNode::Inspect {
900 f: inspect_f,
901 input: Box::new(self.ir_node.into_inner()),
902 metadata: self.location.new_node_metadata(Self::collection_kind()),
903 },
904 )
905 }
906
907 /// An operator which allows you to "name" a `HydroNode`.
908 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
909 pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
910 {
911 let mut node = self.ir_node.borrow_mut();
912 let metadata = node.metadata_mut();
913 metadata.tag = Some(name.to_string());
914 }
915 self
916 }
917}
918
919impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries>
920 KeyedStream<K, V, L, Unbounded, O, R>
921{
922 /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
923 /// of any overlapping groups. The result has [`NoOrder`] on each group because the
924 /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
925 /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
926 ///
927 /// Currently, both input streams must be [`Unbounded`].
928 ///
929 /// # Example
930 /// ```rust
931 /// # use hydro_lang::prelude::*;
932 /// # use futures::StreamExt;
933 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
934 /// let numbers1 = process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed();
935 /// let numbers2 = process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed();
936 /// numbers1.interleave(numbers2)
937 /// # .entries()
938 /// # }, |mut stream| async move {
939 /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
940 /// # for w in vec![(1, 2), (3, 4), (1, 3), (3, 5)] {
941 /// # assert_eq!(stream.next().await.unwrap(), w);
942 /// # }
943 /// # }));
944 /// ```
945 pub fn interleave<O2: Ordering, R2: Retries>(
946 self,
947 other: KeyedStream<K, V, L, Unbounded, O2, R2>,
948 ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
949 where
950 R: MinRetries<R2>,
951 {
952 let tick = self.location.tick();
953 // Because the outputs are unordered, we can interleave batches from both streams.
954 let nondet_batch_interleaving = nondet!(/** output stream is NoOrder, can interleave */);
955 self.batch(&tick, nondet_batch_interleaving)
956 .weakest_ordering()
957 .chain(
958 other
959 .batch(&tick, nondet_batch_interleaving)
960 .weakest_ordering(),
961 )
962 .all_ticks()
963 }
964}
965
966/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
967/// control the processing of future elements.
968pub enum Generate<T> {
969 /// Emit the provided element, and keep processing future inputs.
970 Yield(T),
971 /// Emit the provided element as the _final_ element, do not process future inputs.
972 Return(T),
973 /// Do not emit anything, but continue processing future inputs.
974 Continue,
975 /// Do not emit anything, and do not process further inputs.
976 Break,
977}
978
979impl<'a, K, V, L, B: Boundedness> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
980where
981 K: Eq + Hash,
982 L: Location<'a>,
983{
984 /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
985 ///
986 /// Unlike [`Stream::fold_keyed`] which only returns the final accumulated value, `scan` produces a new stream
987 /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
988 /// early by returning `None`.
989 ///
990 /// The function takes a mutable reference to the accumulator and the current element, and returns
991 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
992 /// If the function returns `None`, the stream is terminated and no more elements are processed.
993 ///
994 /// # Example
995 /// ```rust
996 /// # use hydro_lang::prelude::*;
997 /// # use futures::StreamExt;
998 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
999 /// process
1000 /// .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
1001 /// .into_keyed()
1002 /// .scan(
1003 /// q!(|| 0),
1004 /// q!(|acc, x| {
1005 /// *acc += x;
1006 /// if *acc % 2 == 0 { None } else { Some(*acc) }
1007 /// }),
1008 /// )
1009 /// # .entries()
1010 /// # }, |mut stream| async move {
1011 /// // Output: { 0: [1], 1: [3, 7] }
1012 /// # for w in vec![(0, 1), (1, 3), (1, 7)] {
1013 /// # assert_eq!(stream.next().await.unwrap(), w);
1014 /// # }
1015 /// # }));
1016 /// ```
1017 pub fn scan<A, U, I, F>(
1018 self,
1019 init: impl IntoQuotedMut<'a, I, L> + Copy,
1020 f: impl IntoQuotedMut<'a, F, L> + Copy,
1021 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1022 where
1023 K: Clone,
1024 I: Fn() -> A + 'a,
1025 F: Fn(&mut A, V) -> Option<U> + 'a,
1026 {
1027 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1028 self.generator(
1029 init,
1030 q!({
1031 let orig = f;
1032 move |state, v| {
1033 if let Some(out) = orig(state, v) {
1034 Generate::Yield(out)
1035 } else {
1036 Generate::Break
1037 }
1038 }
1039 }),
1040 )
1041 }
1042
1043 /// Iteratively processes the elements in each group using a state machine that can yield
1044 /// elements as it processes its inputs. This is designed to mirror the unstable generator
1045 /// syntax in Rust, without requiring special syntax.
1046 ///
1047 /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
1048 /// state for each group. The second argument defines the processing logic, taking in a
1049 /// mutable reference to the group's state and the value to be processed. It emits a
1050 /// [`Generate`] value, whose variants define what is emitted and whether further inputs
1051 /// should be processed.
1052 ///
1053 /// # Example
1054 /// ```rust
1055 /// # use hydro_lang::prelude::*;
1056 /// # use futures::StreamExt;
1057 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1058 /// process
1059 /// .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
1060 /// .into_keyed()
1061 /// .generator(
1062 /// q!(|| 0),
1063 /// q!(|acc, x| {
1064 /// *acc += x;
1065 /// if *acc > 100 {
1066 /// hydro_lang::live_collections::keyed_stream::Generate::Return(
1067 /// "done!".to_string()
1068 /// )
1069 /// } else if *acc % 2 == 0 {
1070 /// hydro_lang::live_collections::keyed_stream::Generate::Yield(
1071 /// "even".to_string()
1072 /// )
1073 /// } else {
1074 /// hydro_lang::live_collections::keyed_stream::Generate::Continue
1075 /// }
1076 /// }),
1077 /// )
1078 /// # .entries()
1079 /// # }, |mut stream| async move {
1080 /// // Output: { 0: ["even", "done!"], 1: ["even"] }
1081 /// # for w in vec![(0, "even".to_string()), (0, "done!".to_string()), (1, "even".to_string())] {
1082 /// # assert_eq!(stream.next().await.unwrap(), w);
1083 /// # }
1084 /// # }));
1085 /// ```
1086 pub fn generator<A, U, I, F>(
1087 self,
1088 init: impl IntoQuotedMut<'a, I, L> + Copy,
1089 f: impl IntoQuotedMut<'a, F, L> + Copy,
1090 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1091 where
1092 K: Clone,
1093 I: Fn() -> A + 'a,
1094 F: Fn(&mut A, V) -> Generate<U> + 'a,
1095 {
1096 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1097 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1098
1099 let scan_init = q!(|| HashMap::new())
1100 .splice_fn0_ctx::<HashMap<K, Option<A>>>(&self.location)
1101 .into();
1102 let scan_f = q!(move |acc: &mut HashMap<_, _>, (k, v)| {
1103 let existing_state = acc.entry(Clone::clone(&k)).or_insert_with(|| Some(init()));
1104 if let Some(existing_state_value) = existing_state {
1105 match f(existing_state_value, v) {
1106 Generate::Yield(out) => Some(Some((k, out))),
1107 Generate::Return(out) => {
1108 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1109 Some(Some((k, out)))
1110 }
1111 Generate::Break => {
1112 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1113 Some(None)
1114 }
1115 Generate::Continue => Some(None),
1116 }
1117 } else {
1118 Some(None)
1119 }
1120 })
1121 .splice_fn2_borrow_mut_ctx::<HashMap<K, Option<A>>, (K, V), _>(&self.location)
1122 .into();
1123
1124 let scan_node = HydroNode::Scan {
1125 init: scan_init,
1126 acc: scan_f,
1127 input: Box::new(self.ir_node.into_inner()),
1128 metadata: self.location.new_node_metadata(Stream::<
1129 Option<(K, U)>,
1130 L,
1131 B,
1132 TotalOrder,
1133 ExactlyOnce,
1134 >::collection_kind()),
1135 };
1136
1137 let flatten_f = q!(|d| d)
1138 .splice_fn1_ctx::<Option<(K, U)>, _>(&self.location)
1139 .into();
1140 let flatten_node = HydroNode::FlatMap {
1141 f: flatten_f,
1142 input: Box::new(scan_node),
1143 metadata: self.location.new_node_metadata(KeyedStream::<
1144 K,
1145 U,
1146 L,
1147 B,
1148 TotalOrder,
1149 ExactlyOnce,
1150 >::collection_kind()),
1151 };
1152
1153 KeyedStream::new(self.location.clone(), flatten_node)
1154 }
1155
1156 /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
1157 /// in-order across the values in each group. But the aggregation function returns a boolean,
1158 /// which when true indicates that the aggregated result is complete and can be released to
1159 /// downstream computation. Unlike [`Stream::fold_keyed`], this means that even if the input
1160 /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
1161 /// normal stream elements.
1162 ///
1163 /// # Example
1164 /// ```rust
1165 /// # use hydro_lang::prelude::*;
1166 /// # use futures::StreamExt;
1167 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1168 /// process
1169 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1170 /// .into_keyed()
1171 /// .fold_early_stop(
1172 /// q!(|| 0),
1173 /// q!(|acc, x| {
1174 /// *acc += x;
1175 /// x % 2 == 0
1176 /// }),
1177 /// )
1178 /// # .entries()
1179 /// # }, |mut stream| async move {
1180 /// // Output: { 0: 2, 1: 9 }
1181 /// # for w in vec![(0, 2), (1, 9)] {
1182 /// # assert_eq!(stream.next().await.unwrap(), w);
1183 /// # }
1184 /// # }));
1185 /// ```
1186 pub fn fold_early_stop<A, I, F>(
1187 self,
1188 init: impl IntoQuotedMut<'a, I, L> + Copy,
1189 f: impl IntoQuotedMut<'a, F, L> + Copy,
1190 ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
1191 where
1192 K: Clone,
1193 I: Fn() -> A + 'a,
1194 F: Fn(&mut A, V) -> bool + 'a,
1195 {
1196 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1197 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1198 let out_without_bound_cast = self.generator(
1199 q!(move || Some(init())),
1200 q!(move |key_state, v| {
1201 if let Some(key_state_value) = key_state.as_mut() {
1202 if f(key_state_value, v) {
1203 Generate::Return(key_state.take().unwrap())
1204 } else {
1205 Generate::Continue
1206 }
1207 } else {
1208 unreachable!()
1209 }
1210 }),
1211 );
1212
1213 KeyedSingleton::new(
1214 out_without_bound_cast.location.clone(),
1215 HydroNode::Cast {
1216 inner: Box::new(out_without_bound_cast.ir_node.into_inner()),
1217 metadata: out_without_bound_cast
1218 .location
1219 .new_node_metadata(
1220 KeyedSingleton::<K, A, L, B::WhenValueBounded>::collection_kind(),
1221 ),
1222 },
1223 )
1224 }
1225
1226 /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
1227 /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
1228 /// otherwise the first element would be non-deterministic.
1229 ///
1230 /// # Example
1231 /// ```rust
1232 /// # use hydro_lang::prelude::*;
1233 /// # use futures::StreamExt;
1234 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1235 /// process
1236 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1237 /// .into_keyed()
1238 /// .first()
1239 /// # .entries()
1240 /// # }, |mut stream| async move {
1241 /// // Output: { 0: 2, 1: 3 }
1242 /// # for w in vec![(0, 2), (1, 3)] {
1243 /// # assert_eq!(stream.next().await.unwrap(), w);
1244 /// # }
1245 /// # }));
1246 /// ```
1247 pub fn first(self) -> KeyedSingleton<K, V, L, B::WhenValueBounded>
1248 where
1249 K: Clone,
1250 {
1251 self.fold_early_stop(
1252 q!(|| None),
1253 q!(|acc, v| {
1254 *acc = Some(v);
1255 true
1256 }),
1257 )
1258 .map(q!(|v| v.unwrap()))
1259 }
1260
1261 /// Like [`Stream::fold`], aggregates the values in each group via the `comb` closure.
1262 ///
1263 /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1264 /// to depend on the order of elements in the group.
1265 ///
1266 /// If the input and output value types are the same and do not require initialization then use
1267 /// [`KeyedStream::reduce`].
1268 ///
1269 /// # Example
1270 /// ```rust
1271 /// # use hydro_lang::prelude::*;
1272 /// # use futures::StreamExt;
1273 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1274 /// let tick = process.tick();
1275 /// let numbers = process
1276 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1277 /// .into_keyed();
1278 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1279 /// batch
1280 /// .fold(q!(|| 0), q!(|acc, x| *acc += x))
1281 /// .entries()
1282 /// .all_ticks()
1283 /// # }, |mut stream| async move {
1284 /// // (1, 5), (2, 7)
1285 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1286 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1287 /// # }));
1288 /// ```
1289 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1290 self,
1291 init: impl IntoQuotedMut<'a, I, L>,
1292 comb: impl IntoQuotedMut<'a, F, L>,
1293 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1294 let init = init.splice_fn0_ctx(&self.location).into();
1295 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1296
1297 KeyedSingleton::new(
1298 self.location.clone(),
1299 HydroNode::FoldKeyed {
1300 init,
1301 acc: comb,
1302 input: Box::new(self.ir_node.into_inner()),
1303 metadata: self.location.new_node_metadata(KeyedSingleton::<
1304 K,
1305 A,
1306 L,
1307 B::WhenValueUnbounded,
1308 >::collection_kind()),
1309 },
1310 )
1311 }
1312
1313 /// Like [`Stream::reduce`], aggregates the values in each group via the `comb` closure.
1314 ///
1315 /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1316 /// to depend on the order of elements in the stream.
1317 ///
1318 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1319 ///
1320 /// # Example
1321 /// ```rust
1322 /// # use hydro_lang::prelude::*;
1323 /// # use futures::StreamExt;
1324 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1325 /// let tick = process.tick();
1326 /// let numbers = process
1327 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1328 /// .into_keyed();
1329 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1330 /// batch.reduce(q!(|acc, x| *acc += x)).entries().all_ticks()
1331 /// # }, |mut stream| async move {
1332 /// // (1, 5), (2, 7)
1333 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1334 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1335 /// # }));
1336 /// ```
1337 pub fn reduce<F: Fn(&mut V, V) + 'a>(
1338 self,
1339 comb: impl IntoQuotedMut<'a, F, L>,
1340 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1341 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1342
1343 KeyedSingleton::new(
1344 self.location.clone(),
1345 HydroNode::ReduceKeyed {
1346 f,
1347 input: Box::new(self.ir_node.into_inner()),
1348 metadata: self.location.new_node_metadata(KeyedSingleton::<
1349 K,
1350 V,
1351 L,
1352 B::WhenValueUnbounded,
1353 >::collection_kind()),
1354 },
1355 )
1356 }
1357
1358 /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark are automatically deleted.
1359 ///
1360 /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1361 /// to depend on the order of elements in the stream.
1362 ///
1363 /// # Example
1364 /// ```rust
1365 /// # use hydro_lang::prelude::*;
1366 /// # use futures::StreamExt;
1367 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1368 /// let tick = process.tick();
1369 /// let watermark = tick.singleton(q!(1));
1370 /// let numbers = process
1371 /// .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1372 /// .into_keyed();
1373 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1374 /// batch
1375 /// .reduce_watermark(watermark, q!(|acc, x| *acc += x))
1376 /// .entries()
1377 /// .all_ticks()
1378 /// # }, |mut stream| async move {
1379 /// // (2, 204)
1380 /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
1381 /// # }));
1382 /// ```
1383 pub fn reduce_watermark<O, F>(
1384 self,
1385 other: impl Into<Optional<O, Tick<L::Root>, Bounded>>,
1386 comb: impl IntoQuotedMut<'a, F, L>,
1387 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1388 where
1389 O: Clone,
1390 F: Fn(&mut V, V) + 'a,
1391 {
1392 let other: Optional<O, Tick<L::Root>, Bounded> = other.into();
1393 check_matching_location(&self.location.root(), other.location.outer());
1394 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1395
1396 KeyedSingleton::new(
1397 self.location.clone(),
1398 HydroNode::ReduceKeyedWatermark {
1399 f,
1400 input: Box::new(self.ir_node.into_inner()),
1401 watermark: Box::new(other.ir_node.into_inner()),
1402 metadata: self.location.new_node_metadata(KeyedSingleton::<
1403 K,
1404 V,
1405 L,
1406 B::WhenValueUnbounded,
1407 >::collection_kind()),
1408 },
1409 )
1410 }
1411}
1412
1413impl<'a, K, V, L, B: Boundedness, O: Ordering> KeyedStream<K, V, L, B, O, ExactlyOnce>
1414where
1415 K: Eq + Hash,
1416 L: Location<'a>,
1417{
1418 /// Like [`Stream::fold_commutative`], aggregates the values in each group via the `comb` closure.
1419 ///
1420 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1421 ///
1422 /// If the input and output value types are the same and do not require initialization then use
1423 /// [`KeyedStream::reduce_commutative`].
1424 ///
1425 /// # Example
1426 /// ```rust
1427 /// # use hydro_lang::prelude::*;
1428 /// # use futures::StreamExt;
1429 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1430 /// let tick = process.tick();
1431 /// let numbers = process
1432 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1433 /// .into_keyed();
1434 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1435 /// batch
1436 /// .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1437 /// .entries()
1438 /// .all_ticks()
1439 /// # }, |mut stream| async move {
1440 /// // (1, 5), (2, 7)
1441 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1442 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1443 /// # }));
1444 /// ```
1445 pub fn fold_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1446 self,
1447 init: impl IntoQuotedMut<'a, I, L>,
1448 comb: impl IntoQuotedMut<'a, F, L>,
1449 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1450 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1451 .fold(init, comb)
1452 }
1453
1454 /// Like [`Stream::reduce_commutative`], aggregates the values in each group via the `comb` closure.
1455 ///
1456 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1457 ///
1458 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative`].
1459 ///
1460 /// # Example
1461 /// ```rust
1462 /// # use hydro_lang::prelude::*;
1463 /// # use futures::StreamExt;
1464 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1465 /// let tick = process.tick();
1466 /// let numbers = process
1467 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1468 /// .into_keyed();
1469 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1470 /// batch
1471 /// .reduce_commutative(q!(|acc, x| *acc += x))
1472 /// .entries()
1473 /// .all_ticks()
1474 /// # }, |mut stream| async move {
1475 /// // (1, 5), (2, 7)
1476 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1477 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1478 /// # }));
1479 /// ```
1480 pub fn reduce_commutative<F: Fn(&mut V, V) + 'a>(
1481 self,
1482 comb: impl IntoQuotedMut<'a, F, L>,
1483 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1484 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1485 .reduce(comb)
1486 }
1487
1488 /// A special case of [`KeyedStream::reduce_commutative`] where tuples with keys less than the watermark are automatically deleted.
1489 ///
1490 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1491 ///
1492 /// # Example
1493 /// ```rust
1494 /// # use hydro_lang::prelude::*;
1495 /// # use futures::StreamExt;
1496 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1497 /// let tick = process.tick();
1498 /// let watermark = tick.singleton(q!(1));
1499 /// let numbers = process
1500 /// .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1501 /// .into_keyed();
1502 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1503 /// batch
1504 /// .reduce_watermark_commutative(watermark, q!(|acc, x| *acc += x))
1505 /// .entries()
1506 /// .all_ticks()
1507 /// # }, |mut stream| async move {
1508 /// // (2, 204)
1509 /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
1510 /// # }));
1511 /// ```
1512 pub fn reduce_watermark_commutative<O2, F>(
1513 self,
1514 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1515 comb: impl IntoQuotedMut<'a, F, L>,
1516 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1517 where
1518 O2: Clone,
1519 F: Fn(&mut V, V) + 'a,
1520 {
1521 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1522 .reduce_watermark(other, comb)
1523 }
1524}
1525
1526impl<'a, K, V, L, B: Boundedness, R: Retries> KeyedStream<K, V, L, B, TotalOrder, R>
1527where
1528 K: Eq + Hash,
1529 L: Location<'a>,
1530{
1531 /// Like [`Stream::fold_idempotent`], aggregates the values in each group via the `comb` closure.
1532 ///
1533 /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
1534 ///
1535 /// If the input and output value types are the same and do not require initialization then use
1536 /// [`KeyedStream::reduce_idempotent`].
1537 ///
1538 /// # Example
1539 /// ```rust
1540 /// # use hydro_lang::prelude::*;
1541 /// # use futures::StreamExt;
1542 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1543 /// let tick = process.tick();
1544 /// let numbers = process
1545 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1546 /// .into_keyed();
1547 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1548 /// batch
1549 /// .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1550 /// .entries()
1551 /// .all_ticks()
1552 /// # }, |mut stream| async move {
1553 /// // (1, false), (2, true)
1554 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1555 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1556 /// # }));
1557 /// ```
1558 pub fn fold_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1559 self,
1560 init: impl IntoQuotedMut<'a, I, L>,
1561 comb: impl IntoQuotedMut<'a, F, L>,
1562 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1563 self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1564 .fold(init, comb)
1565 }
1566
1567 /// Like [`Stream::reduce_idempotent`], aggregates the values in each group via the `comb` closure.
1568 ///
1569 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1570 ///
1571 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_idempotent`].
1572 ///
1573 /// # Example
1574 /// ```rust
1575 /// # use hydro_lang::prelude::*;
1576 /// # use futures::StreamExt;
1577 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1578 /// let tick = process.tick();
1579 /// let numbers = process
1580 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1581 /// .into_keyed();
1582 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1583 /// batch
1584 /// .reduce_idempotent(q!(|acc, x| *acc |= x))
1585 /// .entries()
1586 /// .all_ticks()
1587 /// # }, |mut stream| async move {
1588 /// // (1, false), (2, true)
1589 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1590 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1591 /// # }));
1592 /// ```
1593 pub fn reduce_idempotent<F: Fn(&mut V, V) + 'a>(
1594 self,
1595 comb: impl IntoQuotedMut<'a, F, L>,
1596 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1597 self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1598 .reduce(comb)
1599 }
1600
1601 /// A special case of [`KeyedStream::reduce_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1602 ///
1603 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1604 ///
1605 /// # Example
1606 /// ```rust
1607 /// # use hydro_lang::prelude::*;
1608 /// # use futures::StreamExt;
1609 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1610 /// let tick = process.tick();
1611 /// let watermark = tick.singleton(q!(1));
1612 /// let numbers = process
1613 /// .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1614 /// .into_keyed();
1615 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1616 /// batch
1617 /// .reduce_watermark_idempotent(watermark, q!(|acc, x| *acc |= x))
1618 /// .entries()
1619 /// .all_ticks()
1620 /// # }, |mut stream| async move {
1621 /// // (2, true)
1622 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1623 /// # }));
1624 /// ```
1625 pub fn reduce_watermark_idempotent<O2, F>(
1626 self,
1627 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1628 comb: impl IntoQuotedMut<'a, F, L>,
1629 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1630 where
1631 O2: Clone,
1632 F: Fn(&mut V, V) + 'a,
1633 {
1634 self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1635 .reduce_watermark(other, comb)
1636 }
1637}
1638
1639impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1640where
1641 K: Eq + Hash,
1642 L: Location<'a>,
1643{
1644 /// Like [`Stream::fold_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1645 ///
1646 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1647 /// as there may be non-deterministic duplicates.
1648 ///
1649 /// If the input and output value types are the same and do not require initialization then use
1650 /// [`KeyedStream::reduce_commutative_idempotent`].
1651 ///
1652 /// # Example
1653 /// ```rust
1654 /// # use hydro_lang::prelude::*;
1655 /// # use futures::StreamExt;
1656 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1657 /// let tick = process.tick();
1658 /// let numbers = process
1659 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1660 /// .into_keyed();
1661 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1662 /// batch
1663 /// .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1664 /// .entries()
1665 /// .all_ticks()
1666 /// # }, |mut stream| async move {
1667 /// // (1, false), (2, true)
1668 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1669 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1670 /// # }));
1671 /// ```
1672 pub fn fold_commutative_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1673 self,
1674 init: impl IntoQuotedMut<'a, I, L>,
1675 comb: impl IntoQuotedMut<'a, F, L>,
1676 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1677 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1678 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1679 .fold(init, comb)
1680 }
1681
1682 /// Like [`Stream::reduce_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1683 ///
1684 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1685 /// as there may be non-deterministic duplicates.
1686 ///
1687 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative_idempotent`].
1688 ///
1689 /// # Example
1690 /// ```rust
1691 /// # use hydro_lang::prelude::*;
1692 /// # use futures::StreamExt;
1693 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1694 /// let tick = process.tick();
1695 /// let numbers = process
1696 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1697 /// .into_keyed();
1698 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1699 /// batch
1700 /// .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1701 /// .entries()
1702 /// .all_ticks()
1703 /// # }, |mut stream| async move {
1704 /// // (1, false), (2, true)
1705 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1706 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1707 /// # }));
1708 /// ```
1709 pub fn reduce_commutative_idempotent<F: Fn(&mut V, V) + 'a>(
1710 self,
1711 comb: impl IntoQuotedMut<'a, F, L>,
1712 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1713 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1714 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1715 .reduce(comb)
1716 }
1717
1718 /// A special case of [`Stream::reduce_keyed_commutative_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1719 ///
1720 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1721 /// as there may be non-deterministic duplicates.
1722 ///
1723 /// # Example
1724 /// ```rust
1725 /// # use hydro_lang::prelude::*;
1726 /// # use futures::StreamExt;
1727 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1728 /// let tick = process.tick();
1729 /// let watermark = tick.singleton(q!(1));
1730 /// let numbers = process
1731 /// .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1732 /// .into_keyed();
1733 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1734 /// batch
1735 /// .reduce_watermark_commutative_idempotent(watermark, q!(|acc, x| *acc |= x))
1736 /// .entries()
1737 /// .all_ticks()
1738 /// # }, |mut stream| async move {
1739 /// // (2, true)
1740 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1741 /// # }));
1742 /// ```
1743 pub fn reduce_watermark_commutative_idempotent<O2, F>(
1744 self,
1745 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1746 comb: impl IntoQuotedMut<'a, F, L>,
1747 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1748 where
1749 O2: Clone,
1750 F: Fn(&mut V, V) + 'a,
1751 {
1752 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1753 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1754 .reduce_watermark(other, comb)
1755 }
1756
1757 /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1758 /// whose keys are not in the bounded stream.
1759 ///
1760 /// # Example
1761 /// ```rust
1762 /// # use hydro_lang::prelude::*;
1763 /// # use futures::StreamExt;
1764 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1765 /// let tick = process.tick();
1766 /// let keyed_stream = process
1767 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1768 /// .batch(&tick, nondet!(/** test */))
1769 /// .into_keyed();
1770 /// let keys_to_remove = process
1771 /// .source_iter(q!(vec![1, 2]))
1772 /// .batch(&tick, nondet!(/** test */));
1773 /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1774 /// # .entries()
1775 /// # }, |mut stream| async move {
1776 /// // { 3: ['c'], 4: ['d'] }
1777 /// # for w in vec![(3, 'c'), (4, 'd')] {
1778 /// # assert_eq!(stream.next().await.unwrap(), w);
1779 /// # }
1780 /// # }));
1781 /// ```
1782 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1783 self,
1784 other: Stream<K, L, Bounded, O2, R2>,
1785 ) -> Self {
1786 check_matching_location(&self.location, &other.location);
1787
1788 KeyedStream::new(
1789 self.location.clone(),
1790 HydroNode::AntiJoin {
1791 pos: Box::new(self.ir_node.into_inner()),
1792 neg: Box::new(other.ir_node.into_inner()),
1793 metadata: self.location.new_node_metadata(Self::collection_kind()),
1794 },
1795 )
1796 }
1797}
1798
1799impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1800where
1801 L: Location<'a>,
1802{
1803 /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
1804 /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
1805 ///
1806 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1807 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1808 /// argument that declares where the stream will be atomically processed. Batching a stream into
1809 /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
1810 /// [`Tick`] will introduce asynchrony.
1811 pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
1812 let out_location = Atomic { tick: tick.clone() };
1813 KeyedStream::new(
1814 out_location.clone(),
1815 HydroNode::BeginAtomic {
1816 inner: Box::new(self.ir_node.into_inner()),
1817 metadata: out_location
1818 .new_node_metadata(KeyedStream::<K, V, Atomic<L>, B, O, R>::collection_kind()),
1819 },
1820 )
1821 }
1822
1823 /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
1824 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1825 /// the order of the input.
1826 ///
1827 /// # Non-Determinism
1828 /// The batch boundaries are non-deterministic and may change across executions.
1829 pub fn batch(
1830 self,
1831 tick: &Tick<L>,
1832 nondet: NonDet,
1833 ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1834 let _ = nondet;
1835 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1836 KeyedStream::new(
1837 tick.clone(),
1838 HydroNode::Batch {
1839 inner: Box::new(self.ir_node.into_inner()),
1840 metadata: tick.new_node_metadata(
1841 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
1842 ),
1843 },
1844 )
1845 }
1846}
1847
1848impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
1849where
1850 L: Location<'a> + NoTick,
1851{
1852 /// Returns a keyed stream corresponding to the latest batch of elements being atomically
1853 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
1854 /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
1855 /// used to create the atomic section.
1856 ///
1857 /// # Non-Determinism
1858 /// The batch boundaries are non-deterministic and may change across executions.
1859 pub fn batch_atomic(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1860 let _ = nondet;
1861 KeyedStream::new(
1862 self.location.clone().tick,
1863 HydroNode::Batch {
1864 inner: Box::new(self.ir_node.into_inner()),
1865 metadata: self.location.tick.new_node_metadata(KeyedStream::<
1866 K,
1867 V,
1868 Tick<L>,
1869 Bounded,
1870 O,
1871 R,
1872 >::collection_kind(
1873 )),
1874 },
1875 )
1876 }
1877
1878 /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
1879 /// See [`KeyedStream::atomic`] for more details.
1880 pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
1881 KeyedStream::new(
1882 self.location.tick.l.clone(),
1883 HydroNode::EndAtomic {
1884 inner: Box::new(self.ir_node.into_inner()),
1885 metadata: self
1886 .location
1887 .tick
1888 .l
1889 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
1890 },
1891 )
1892 }
1893}
1894
1895impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, L, Bounded, O, R>
1896where
1897 L: Location<'a>,
1898{
1899 /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
1900 /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
1901 /// is only present in one of the inputs, its values are passed through as-is). The output has
1902 /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
1903 ///
1904 /// Currently, both input streams must be [`Bounded`]. This operator will block
1905 /// on the first stream until all its elements are available. In a future version,
1906 /// we will relax the requirement on the `other` stream.
1907 ///
1908 /// # Example
1909 /// ```rust
1910 /// # use hydro_lang::prelude::*;
1911 /// # use futures::StreamExt;
1912 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1913 /// let tick = process.tick();
1914 /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
1915 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1916 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1917 /// # .entries()
1918 /// # }, |mut stream| async move {
1919 /// // { 0: [2, 1], 1: [4, 3] }
1920 /// # for w in vec![(0, 2), (1, 4), (0, 1), (1, 3)] {
1921 /// # assert_eq!(stream.next().await.unwrap(), w);
1922 /// # }
1923 /// # }));
1924 /// ```
1925 pub fn chain<O2: Ordering, R2: Retries>(
1926 self,
1927 other: KeyedStream<K, V, L, Bounded, O2, R2>,
1928 ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1929 where
1930 O: MinOrder<O2>,
1931 R: MinRetries<R2>,
1932 {
1933 check_matching_location(&self.location, &other.location);
1934
1935 KeyedStream::new(
1936 self.location.clone(),
1937 HydroNode::Chain {
1938 first: Box::new(self.ir_node.into_inner()),
1939 second: Box::new(other.ir_node.into_inner()),
1940 metadata: self.location.new_node_metadata(KeyedStream::<
1941 K,
1942 V,
1943 L,
1944 Bounded,
1945 <O as MinOrder<O2>>::Min,
1946 <R as MinRetries<R2>>::Min,
1947 >::collection_kind()),
1948 },
1949 )
1950 }
1951}
1952
1953impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
1954where
1955 L: Location<'a>,
1956{
1957 /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
1958 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
1959 /// each key.
1960 pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
1961 KeyedStream::new(
1962 self.location.outer().clone(),
1963 HydroNode::YieldConcat {
1964 inner: Box::new(self.ir_node.into_inner()),
1965 metadata: self.location.outer().new_node_metadata(KeyedStream::<
1966 K,
1967 V,
1968 L,
1969 Unbounded,
1970 O,
1971 R,
1972 >::collection_kind(
1973 )),
1974 },
1975 )
1976 }
1977
1978 /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
1979 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
1980 /// each key.
1981 ///
1982 /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
1983 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1984 /// stream's [`Tick`] context.
1985 pub fn all_ticks_atomic(self) -> KeyedStream<K, V, Atomic<L>, Unbounded, O, R> {
1986 let out_location = Atomic {
1987 tick: self.location.clone(),
1988 };
1989
1990 KeyedStream::new(
1991 out_location.clone(),
1992 HydroNode::YieldConcat {
1993 inner: Box::new(self.ir_node.into_inner()),
1994 metadata: out_location.new_node_metadata(KeyedStream::<
1995 K,
1996 V,
1997 Atomic<L>,
1998 Unbounded,
1999 O,
2000 R,
2001 >::collection_kind()),
2002 },
2003 )
2004 }
2005
2006 /// Shifts the entries in `self` to the **next tick**, so that the returned keyed stream at
2007 /// tick `T` always has the entries of `self` at tick `T - 1`.
2008 ///
2009 /// At tick `0`, the output keyed stream is empty, since there is no previous tick.
2010 ///
2011 /// This operator enables stateful iterative processing with ticks, by sending data from one
2012 /// tick to the next. For example, you can use it to combine inputs across consecutive batches.
2013 ///
2014 /// # Example
2015 /// ```rust
2016 /// # use hydro_lang::prelude::*;
2017 /// # use futures::StreamExt;
2018 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2019 /// let tick = process.tick();
2020 /// # // ticks are lazy by default, forces the second tick to run
2021 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2022 /// # let batch_first_tick = process
2023 /// # .source_iter(q!(vec![(1, 2), (1, 3)]))
2024 /// # .batch(&tick, nondet!(/** test */))
2025 /// # .into_keyed();
2026 /// # let batch_second_tick = process
2027 /// # .source_iter(q!(vec![(1, 4), (2, 5)]))
2028 /// # .batch(&tick, nondet!(/** test */))
2029 /// # .defer_tick()
2030 /// # .into_keyed(); // appears on the second tick
2031 /// let changes_across_ticks = // { 1: [2, 3] } (first tick), { 1: [4], 2: [5] } (second tick)
2032 /// # batch_first_tick.chain(batch_second_tick);
2033 /// changes_across_ticks.clone().defer_tick().chain( // from the previous tick
2034 /// changes_across_ticks // from the current tick
2035 /// )
2036 /// # .entries().all_ticks()
2037 /// # }, |mut stream| async move {
2038 /// // { 1: [2, 3] } (first tick), { 1: [2, 3, 4], 2: [5] } (second tick), { 1: [4], 2: [5] } (third tick)
2039 /// # for w in vec![(1, 2), (1, 3), (1, 2), (1, 3), (1, 4), (2, 5), (1, 4), (2, 5)] {
2040 /// # assert_eq!(stream.next().await.unwrap(), w);
2041 /// # }
2042 /// # }));
2043 /// ```
2044 pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2045 KeyedStream::new(
2046 self.location.clone(),
2047 HydroNode::DeferTick {
2048 input: Box::new(self.ir_node.into_inner()),
2049 metadata: self.location.new_node_metadata(KeyedStream::<
2050 K,
2051 V,
2052 Tick<L>,
2053 Bounded,
2054 O,
2055 R,
2056 >::collection_kind()),
2057 },
2058 )
2059 }
2060}
2061
2062#[cfg(test)]
2063mod tests {
2064 #[cfg(feature = "deploy")]
2065 use futures::{SinkExt, StreamExt};
2066 #[cfg(feature = "deploy")]
2067 use hydro_deploy::Deployment;
2068 use stageleft::q;
2069
2070 use crate::compile::builder::FlowBuilder;
2071 #[cfg(feature = "deploy")]
2072 use crate::live_collections::stream::ExactlyOnce;
2073 use crate::location::Location;
2074 use crate::nondet::nondet;
2075
2076 #[cfg(feature = "deploy")]
2077 #[tokio::test]
2078 async fn reduce_watermark_filter() {
2079 let mut deployment = Deployment::new();
2080
2081 let flow = FlowBuilder::new();
2082 let node = flow.process::<()>();
2083 let external = flow.external::<()>();
2084
2085 let node_tick = node.tick();
2086 let watermark = node_tick.singleton(q!(1));
2087
2088 let sum = node
2089 .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2090 .into_keyed()
2091 .reduce_watermark(
2092 watermark,
2093 q!(|acc, v| {
2094 *acc += v;
2095 }),
2096 )
2097 .snapshot(&node_tick, nondet!(/** test */))
2098 .entries()
2099 .all_ticks()
2100 .send_bincode_external(&external);
2101
2102 let nodes = flow
2103 .with_process(&node, deployment.Localhost())
2104 .with_external(&external, deployment.Localhost())
2105 .deploy(&mut deployment);
2106
2107 deployment.deploy().await.unwrap();
2108
2109 let mut out = nodes.connect(sum).await;
2110
2111 deployment.start().await.unwrap();
2112
2113 assert_eq!(out.next().await.unwrap(), (2, 204));
2114 }
2115
2116 #[cfg(feature = "deploy")]
2117 #[tokio::test]
2118 async fn reduce_watermark_garbage_collect() {
2119 let mut deployment = Deployment::new();
2120
2121 let flow = FlowBuilder::new();
2122 let node = flow.process::<()>();
2123 let external = flow.external::<()>();
2124 let (tick_send, tick_trigger) =
2125 node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
2126
2127 let node_tick = node.tick();
2128 let (watermark_complete_cycle, watermark) =
2129 node_tick.cycle_with_initial(node_tick.singleton(q!(1)));
2130 let next_watermark = watermark.clone().map(q!(|v| v + 1));
2131 watermark_complete_cycle.complete_next_tick(next_watermark);
2132
2133 let tick_triggered_input = node
2134 .source_iter(q!([(3, 103)]))
2135 .batch(&node_tick, nondet!(/** test */))
2136 .filter_if_some(
2137 tick_trigger
2138 .clone()
2139 .batch(&node_tick, nondet!(/** test */))
2140 .first(),
2141 )
2142 .all_ticks();
2143
2144 let sum = node
2145 .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2146 .interleave(tick_triggered_input)
2147 .into_keyed()
2148 .reduce_watermark_commutative(
2149 watermark,
2150 q!(|acc, v| {
2151 *acc += v;
2152 }),
2153 )
2154 .snapshot(&node_tick, nondet!(/** test */))
2155 .entries()
2156 .all_ticks()
2157 .send_bincode_external(&external);
2158
2159 let nodes = flow
2160 .with_default_optimize()
2161 .with_process(&node, deployment.Localhost())
2162 .with_external(&external, deployment.Localhost())
2163 .deploy(&mut deployment);
2164
2165 deployment.deploy().await.unwrap();
2166
2167 let mut tick_send = nodes.connect(tick_send).await;
2168 let mut out_recv = nodes.connect(sum).await;
2169
2170 deployment.start().await.unwrap();
2171
2172 assert_eq!(out_recv.next().await.unwrap(), (2, 204));
2173
2174 tick_send.send(()).await.unwrap();
2175
2176 assert_eq!(out_recv.next().await.unwrap(), (3, 103));
2177 }
2178
2179 #[test]
2180 #[should_panic]
2181 fn sim_batch_nondet_size() {
2182 let flow = FlowBuilder::new();
2183 let external = flow.external::<()>();
2184 let node = flow.process::<()>();
2185
2186 let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2187
2188 let tick = node.tick();
2189 let out_port = input
2190 .batch(&tick, nondet!(/** test */))
2191 .fold(q!(|| vec![]), q!(|acc, v| acc.push(v)))
2192 .entries()
2193 .all_ticks()
2194 .send_bincode_external(&external);
2195
2196 flow.sim().exhaustive(async |mut compiled| {
2197 let out_recv = compiled.connect(&out_port);
2198 compiled.launch();
2199
2200 out_recv
2201 .assert_yields_only_unordered([(1, vec![1, 2])])
2202 .await;
2203 });
2204 }
2205
2206 #[test]
2207 fn sim_batch_preserves_group_order() {
2208 let flow = FlowBuilder::new();
2209 let external = flow.external::<()>();
2210 let node = flow.process::<()>();
2211
2212 let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2213
2214 let tick = node.tick();
2215 let out_port = input
2216 .batch(&tick, nondet!(/** test */))
2217 .all_ticks()
2218 .fold_early_stop(
2219 q!(|| 0),
2220 q!(|acc, v| {
2221 *acc = std::cmp::max(v, *acc);
2222 *acc >= 2
2223 }),
2224 )
2225 .entries()
2226 .send_bincode_external(&external);
2227
2228 let instances = flow.sim().exhaustive(async |mut compiled| {
2229 let out_recv = compiled.connect(&out_port);
2230 compiled.launch();
2231
2232 out_recv
2233 .assert_yields_only_unordered([(1, 2), (2, 3)])
2234 .await;
2235 });
2236
2237 assert_eq!(instances, 8);
2238 // - three cases: all three in a separate tick (pick where (2, 3) is)
2239 // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after
2240 // - two cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them
2241 // - one case: all three together
2242 }
2243}