hydro_lang/live_collections/stream/mod.rs
1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11use syn::parse_quote;
12use tokio::time::Instant;
13
14use super::boundedness::{Bounded, Boundedness, Unbounded};
15use super::keyed_singleton::KeyedSingleton;
16use super::keyed_stream::KeyedStream;
17use super::optional::Optional;
18use super::singleton::Singleton;
19use crate::compile::ir::{HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
20#[cfg(stageleft_runtime)]
21use crate::forward_handle::{CycleCollection, ReceiverComplete};
22use crate::forward_handle::{ForwardRef, TickCycle};
23#[cfg(stageleft_runtime)]
24use crate::location::dynamic::{DynLocation, LocationId};
25use crate::location::tick::{Atomic, DeferTick, NoAtomic};
26use crate::location::{Location, NoTick, Tick, check_matching_location};
27use crate::nondet::{NonDet, nondet};
28
29pub mod networking;
30
31/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
32#[sealed::sealed]
33pub trait Ordering:
34 MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
35{
36}
37
38/// Marks the stream as being totally ordered, which means that there are
39/// no sources of non-determinism (other than intentional ones) that will
40/// affect the order of elements.
41pub enum TotalOrder {}
42
43#[sealed::sealed]
44impl Ordering for TotalOrder {}
45
46/// Marks the stream as having no order, which means that the order of
47/// elements may be affected by non-determinism.
48///
49/// This restricts certain operators, such as `fold` and `reduce`, to only
50/// be used with commutative aggregation functions.
51pub enum NoOrder {}
52
53#[sealed::sealed]
54impl Ordering for NoOrder {}
55
56/// Helper trait for determining the weakest of two orderings.
57#[sealed::sealed]
58pub trait MinOrder<Other: ?Sized> {
59 /// The weaker of the two orderings.
60 type Min: Ordering;
61}
62
63#[sealed::sealed]
64impl MinOrder<NoOrder> for TotalOrder {
65 type Min = NoOrder;
66}
67
68#[sealed::sealed]
69impl MinOrder<TotalOrder> for TotalOrder {
70 type Min = TotalOrder;
71}
72
73#[sealed::sealed]
74impl MinOrder<TotalOrder> for NoOrder {
75 type Min = NoOrder;
76}
77
78#[sealed::sealed]
79impl MinOrder<NoOrder> for NoOrder {
80 type Min = NoOrder;
81}
82
83/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
84#[sealed::sealed]
85pub trait Retries:
86 MinRetries<Self, Min = Self>
87 + MinRetries<ExactlyOnce, Min = Self>
88 + MinRetries<AtLeastOnce, Min = AtLeastOnce>
89{
90}
91
92/// Marks the stream as having deterministic message cardinality, with no
93/// possibility of duplicates.
94pub enum ExactlyOnce {}
95
96#[sealed::sealed]
97impl Retries for ExactlyOnce {}
98
99/// Marks the stream as having non-deterministic message cardinality, which
100/// means that duplicates may occur, but messages will not be dropped.
101pub enum AtLeastOnce {}
102
103#[sealed::sealed]
104impl Retries for AtLeastOnce {}
105
106/// Helper trait for determining the weakest of two retry guarantees.
107#[sealed::sealed]
108pub trait MinRetries<Other: ?Sized> {
109 /// The weaker of the two retry guarantees.
110 type Min: Retries;
111}
112
113#[sealed::sealed]
114impl MinRetries<AtLeastOnce> for ExactlyOnce {
115 type Min = AtLeastOnce;
116}
117
118#[sealed::sealed]
119impl MinRetries<ExactlyOnce> for ExactlyOnce {
120 type Min = ExactlyOnce;
121}
122
123#[sealed::sealed]
124impl MinRetries<ExactlyOnce> for AtLeastOnce {
125 type Min = AtLeastOnce;
126}
127
128#[sealed::sealed]
129impl MinRetries<AtLeastOnce> for AtLeastOnce {
130 type Min = AtLeastOnce;
131}
132
133/// Streaming sequence of elements with type `Type`.
134///
135/// This live collection represents a growing sequence of elements, with new elements being
136/// asynchronously appended to the end of the sequence. This can be used to model the arrival
137/// of network input, such as API requests, or streaming ingestion.
138///
139/// By default, all streams have deterministic ordering and each element is materialized exactly
140/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
141/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
142/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
143///
144/// Type Parameters:
145/// - `Type`: the type of elements in the stream
146/// - `Loc`: the location where the stream is being materialized
147/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
148/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
149/// (default is [`TotalOrder`])
150/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
151/// [`AtLeastOnce`] (default is [`ExactlyOnce`])
152pub struct Stream<
153 Type,
154 Loc,
155 Bound: Boundedness,
156 Order: Ordering = TotalOrder,
157 Retry: Retries = ExactlyOnce,
158> {
159 pub(crate) location: Loc,
160 pub(crate) ir_node: RefCell<HydroNode>,
161
162 _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
163}
164
165impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
166 for Stream<T, L, Unbounded, O, R>
167where
168 L: Location<'a>,
169{
170 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
171 Stream {
172 location: stream.location,
173 ir_node: stream.ir_node,
174 _phantom: PhantomData,
175 }
176 }
177}
178
179impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
180 for Stream<T, L, B, NoOrder, R>
181where
182 L: Location<'a>,
183{
184 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
185 Stream {
186 location: stream.location,
187 ir_node: stream.ir_node,
188 _phantom: PhantomData,
189 }
190 }
191}
192
193impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
194 for Stream<T, L, B, O, AtLeastOnce>
195where
196 L: Location<'a>,
197{
198 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
199 Stream {
200 location: stream.location,
201 ir_node: stream.ir_node,
202 _phantom: PhantomData,
203 }
204 }
205}
206
207impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
208where
209 L: Location<'a>,
210{
211 fn defer_tick(self) -> Self {
212 Stream::defer_tick(self)
213 }
214}
215
216impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
217 for Stream<T, Tick<L>, Bounded, O, R>
218where
219 L: Location<'a>,
220{
221 type Location = Tick<L>;
222
223 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
224 Stream::new(
225 location.clone(),
226 HydroNode::CycleSource {
227 ident,
228 metadata: location.new_node_metadata::<T>(),
229 },
230 )
231 }
232}
233
234impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
235 for Stream<T, Tick<L>, Bounded, O, R>
236where
237 L: Location<'a>,
238{
239 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
240 assert_eq!(
241 Location::id(&self.location),
242 expected_location,
243 "locations do not match"
244 );
245 self.location
246 .flow_state()
247 .borrow_mut()
248 .push_root(HydroRoot::CycleSink {
249 ident,
250 input: Box::new(self.ir_node.into_inner()),
251 out_location: Location::id(&self.location),
252 op_metadata: HydroIrOpMetadata::new(),
253 });
254 }
255}
256
257impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
258 for Stream<T, L, B, O, R>
259where
260 L: Location<'a> + NoTick,
261{
262 type Location = L;
263
264 fn create_source(ident: syn::Ident, location: L) -> Self {
265 Stream::new(
266 location.clone(),
267 HydroNode::Persist {
268 inner: Box::new(HydroNode::CycleSource {
269 ident,
270 metadata: location.new_node_metadata::<T>(),
271 }),
272 metadata: location.new_node_metadata::<T>(),
273 },
274 )
275 }
276}
277
278impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
279 for Stream<T, L, B, O, R>
280where
281 L: Location<'a> + NoTick,
282{
283 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
284 assert_eq!(
285 Location::id(&self.location),
286 expected_location,
287 "locations do not match"
288 );
289 let metadata = self.location.new_node_metadata::<T>();
290 self.location
291 .flow_state()
292 .borrow_mut()
293 .push_root(HydroRoot::CycleSink {
294 ident,
295 input: Box::new(HydroNode::Unpersist {
296 inner: Box::new(self.ir_node.into_inner()),
297 metadata: metadata.clone(),
298 }),
299 out_location: Location::id(&self.location),
300 op_metadata: HydroIrOpMetadata::new(),
301 });
302 }
303}
304
305impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
306where
307 T: Clone,
308 L: Location<'a>,
309{
310 fn clone(&self) -> Self {
311 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
312 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
313 *self.ir_node.borrow_mut() = HydroNode::Tee {
314 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
315 metadata: self.location.new_node_metadata::<T>(),
316 };
317 }
318
319 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
320 Stream {
321 location: self.location.clone(),
322 ir_node: HydroNode::Tee {
323 inner: TeeNode(inner.0.clone()),
324 metadata: metadata.clone(),
325 }
326 .into(),
327 _phantom: PhantomData,
328 }
329 } else {
330 unreachable!()
331 }
332 }
333}
334
335impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
336where
337 L: Location<'a>,
338{
339 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
340 Stream {
341 location,
342 ir_node: RefCell::new(ir_node),
343 _phantom: PhantomData,
344 }
345 }
346
347 /// Produces a stream based on invoking `f` on each element.
348 /// If you do not want to modify the stream and instead only want to view
349 /// each item use [`Stream::inspect`] instead.
350 ///
351 /// # Example
352 /// ```rust
353 /// # use hydro_lang::prelude::*;
354 /// # use futures::StreamExt;
355 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
356 /// let words = process.source_iter(q!(vec!["hello", "world"]));
357 /// words.map(q!(|x| x.to_uppercase()))
358 /// # }, |mut stream| async move {
359 /// # for w in vec!["HELLO", "WORLD"] {
360 /// # assert_eq!(stream.next().await.unwrap(), w);
361 /// # }
362 /// # }));
363 /// ```
364 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
365 where
366 F: Fn(T) -> U + 'a,
367 {
368 let f = f.splice_fn1_ctx(&self.location).into();
369 Stream::new(
370 self.location.clone(),
371 HydroNode::Map {
372 f,
373 input: Box::new(self.ir_node.into_inner()),
374 metadata: self.location.new_node_metadata::<U>(),
375 },
376 )
377 }
378
379 /// For each item `i` in the input stream, transform `i` using `f` and then treat the
380 /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
381 /// for the output type `U` must produce items in a **deterministic** order.
382 ///
383 /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
384 /// not deterministic, use [`Stream::flat_map_unordered`] instead.
385 ///
386 /// # Example
387 /// ```rust
388 /// # use hydro_lang::prelude::*;
389 /// # use futures::StreamExt;
390 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
391 /// process
392 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
393 /// .flat_map_ordered(q!(|x| x))
394 /// # }, |mut stream| async move {
395 /// // 1, 2, 3, 4
396 /// # for w in (1..5) {
397 /// # assert_eq!(stream.next().await.unwrap(), w);
398 /// # }
399 /// # }));
400 /// ```
401 pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
402 where
403 I: IntoIterator<Item = U>,
404 F: Fn(T) -> I + 'a,
405 {
406 let f = f.splice_fn1_ctx(&self.location).into();
407 Stream::new(
408 self.location.clone(),
409 HydroNode::FlatMap {
410 f,
411 input: Box::new(self.ir_node.into_inner()),
412 metadata: self.location.new_node_metadata::<U>(),
413 },
414 )
415 }
416
417 /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
418 /// for the output type `U` to produce items in any order.
419 ///
420 /// # Example
421 /// ```rust
422 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
423 /// # use futures::StreamExt;
424 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
425 /// process
426 /// .source_iter(q!(vec![
427 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
428 /// std::collections::HashSet::from_iter(vec![3, 4]),
429 /// ]))
430 /// .flat_map_unordered(q!(|x| x))
431 /// # }, |mut stream| async move {
432 /// // 1, 2, 3, 4, but in no particular order
433 /// # let mut results = Vec::new();
434 /// # for w in (1..5) {
435 /// # results.push(stream.next().await.unwrap());
436 /// # }
437 /// # results.sort();
438 /// # assert_eq!(results, vec![1, 2, 3, 4]);
439 /// # }));
440 /// ```
441 pub fn flat_map_unordered<U, I, F>(
442 self,
443 f: impl IntoQuotedMut<'a, F, L>,
444 ) -> Stream<U, L, B, NoOrder, R>
445 where
446 I: IntoIterator<Item = U>,
447 F: Fn(T) -> I + 'a,
448 {
449 let f = f.splice_fn1_ctx(&self.location).into();
450 Stream::new(
451 self.location.clone(),
452 HydroNode::FlatMap {
453 f,
454 input: Box::new(self.ir_node.into_inner()),
455 metadata: self.location.new_node_metadata::<U>(),
456 },
457 )
458 }
459
460 /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
461 /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
462 ///
463 /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
464 /// not deterministic, use [`Stream::flatten_unordered`] instead.
465 ///
466 /// ```rust
467 /// # use hydro_lang::prelude::*;
468 /// # use futures::StreamExt;
469 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
470 /// process
471 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
472 /// .flatten_ordered()
473 /// # }, |mut stream| async move {
474 /// // 1, 2, 3, 4
475 /// # for w in (1..5) {
476 /// # assert_eq!(stream.next().await.unwrap(), w);
477 /// # }
478 /// # }));
479 /// ```
480 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
481 where
482 T: IntoIterator<Item = U>,
483 {
484 self.flat_map_ordered(q!(|d| d))
485 }
486
487 /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
488 /// for the element type `T` to produce items in any order.
489 ///
490 /// # Example
491 /// ```rust
492 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
493 /// # use futures::StreamExt;
494 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
495 /// process
496 /// .source_iter(q!(vec![
497 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
498 /// std::collections::HashSet::from_iter(vec![3, 4]),
499 /// ]))
500 /// .flatten_unordered()
501 /// # }, |mut stream| async move {
502 /// // 1, 2, 3, 4, but in no particular order
503 /// # let mut results = Vec::new();
504 /// # for w in (1..5) {
505 /// # results.push(stream.next().await.unwrap());
506 /// # }
507 /// # results.sort();
508 /// # assert_eq!(results, vec![1, 2, 3, 4]);
509 /// # }));
510 /// ```
511 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
512 where
513 T: IntoIterator<Item = U>,
514 {
515 self.flat_map_unordered(q!(|d| d))
516 }
517
518 /// Creates a stream containing only the elements of the input stream that satisfy a predicate
519 /// `f`, preserving the order of the elements.
520 ///
521 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
522 /// not modify or take ownership of the values. If you need to modify the values while filtering
523 /// use [`Stream::filter_map`] instead.
524 ///
525 /// # Example
526 /// ```rust
527 /// # use hydro_lang::prelude::*;
528 /// # use futures::StreamExt;
529 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
530 /// process
531 /// .source_iter(q!(vec![1, 2, 3, 4]))
532 /// .filter(q!(|&x| x > 2))
533 /// # }, |mut stream| async move {
534 /// // 3, 4
535 /// # for w in (3..5) {
536 /// # assert_eq!(stream.next().await.unwrap(), w);
537 /// # }
538 /// # }));
539 /// ```
540 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<T, L, B, O, R>
541 where
542 F: Fn(&T) -> bool + 'a,
543 {
544 let f = f.splice_fn1_borrow_ctx(&self.location).into();
545 Stream::new(
546 self.location.clone(),
547 HydroNode::Filter {
548 f,
549 input: Box::new(self.ir_node.into_inner()),
550 metadata: self.location.new_node_metadata::<T>(),
551 },
552 )
553 }
554
555 /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
556 ///
557 /// # Example
558 /// ```rust
559 /// # use hydro_lang::prelude::*;
560 /// # use futures::StreamExt;
561 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
562 /// process
563 /// .source_iter(q!(vec!["1", "hello", "world", "2"]))
564 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
565 /// # }, |mut stream| async move {
566 /// // 1, 2
567 /// # for w in (1..3) {
568 /// # assert_eq!(stream.next().await.unwrap(), w);
569 /// # }
570 /// # }));
571 /// ```
572 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
573 where
574 F: Fn(T) -> Option<U> + 'a,
575 {
576 let f = f.splice_fn1_ctx(&self.location).into();
577 Stream::new(
578 self.location.clone(),
579 HydroNode::FilterMap {
580 f,
581 input: Box::new(self.ir_node.into_inner()),
582 metadata: self.location.new_node_metadata::<U>(),
583 },
584 )
585 }
586
587 /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
588 /// where `x` is the final value of `other`, a bounded [`Singleton`].
589 ///
590 /// # Example
591 /// ```rust
592 /// # use hydro_lang::prelude::*;
593 /// # use futures::StreamExt;
594 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
595 /// let tick = process.tick();
596 /// let batch = process
597 /// .source_iter(q!(vec![1, 2, 3, 4]))
598 /// .batch(&tick, nondet!(/** test */));
599 /// let count = batch.clone().count(); // `count()` returns a singleton
600 /// batch.cross_singleton(count).all_ticks()
601 /// # }, |mut stream| async move {
602 /// // (1, 4), (2, 4), (3, 4), (4, 4)
603 /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
604 /// # assert_eq!(stream.next().await.unwrap(), w);
605 /// # }
606 /// # }));
607 /// ```
608 pub fn cross_singleton<O2>(
609 self,
610 other: impl Into<Optional<O2, L, Bounded>>,
611 ) -> Stream<(T, O2), L, B, O, R>
612 where
613 O2: Clone,
614 {
615 let other: Optional<O2, L, Bounded> = other.into();
616 check_matching_location(&self.location, &other.location);
617
618 Stream::new(
619 self.location.clone(),
620 HydroNode::CrossSingleton {
621 left: Box::new(self.ir_node.into_inner()),
622 right: Box::new(other.ir_node.into_inner()),
623 metadata: self.location.new_node_metadata::<(T, O2)>(),
624 },
625 )
626 }
627
628 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
629 ///
630 /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
631 /// leader of a cluster.
632 ///
633 /// # Example
634 /// ```rust
635 /// # use hydro_lang::prelude::*;
636 /// # use futures::StreamExt;
637 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
638 /// let tick = process.tick();
639 /// // ticks are lazy by default, forces the second tick to run
640 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
641 ///
642 /// let batch_first_tick = process
643 /// .source_iter(q!(vec![1, 2, 3, 4]))
644 /// .batch(&tick, nondet!(/** test */));
645 /// let batch_second_tick = process
646 /// .source_iter(q!(vec![5, 6, 7, 8]))
647 /// .batch(&tick, nondet!(/** test */))
648 /// .defer_tick(); // appears on the second tick
649 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
650 /// batch_first_tick.chain(batch_second_tick)
651 /// .filter_if_some(some_on_first_tick)
652 /// .all_ticks()
653 /// # }, |mut stream| async move {
654 /// // [1, 2, 3, 4]
655 /// # for w in vec![1, 2, 3, 4] {
656 /// # assert_eq!(stream.next().await.unwrap(), w);
657 /// # }
658 /// # }));
659 /// ```
660 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
661 self.cross_singleton(signal.map(q!(|_u| ())))
662 .map(q!(|(d, _signal)| d))
663 }
664
665 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
666 ///
667 /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
668 /// some local state.
669 ///
670 /// # Example
671 /// ```rust
672 /// # use hydro_lang::prelude::*;
673 /// # use futures::StreamExt;
674 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
675 /// let tick = process.tick();
676 /// // ticks are lazy by default, forces the second tick to run
677 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
678 ///
679 /// let batch_first_tick = process
680 /// .source_iter(q!(vec![1, 2, 3, 4]))
681 /// .batch(&tick, nondet!(/** test */));
682 /// let batch_second_tick = process
683 /// .source_iter(q!(vec![5, 6, 7, 8]))
684 /// .batch(&tick, nondet!(/** test */))
685 /// .defer_tick(); // appears on the second tick
686 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
687 /// batch_first_tick.chain(batch_second_tick)
688 /// .filter_if_none(some_on_first_tick)
689 /// .all_ticks()
690 /// # }, |mut stream| async move {
691 /// // [5, 6, 7, 8]
692 /// # for w in vec![5, 6, 7, 8] {
693 /// # assert_eq!(stream.next().await.unwrap(), w);
694 /// # }
695 /// # }));
696 /// ```
697 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
698 self.filter_if_some(
699 other
700 .map(q!(|_| ()))
701 .into_singleton()
702 .filter(q!(|o| o.is_none())),
703 )
704 }
705
706 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
707 /// tupled pairs in a non-deterministic order.
708 ///
709 /// # Example
710 /// ```rust
711 /// # use hydro_lang::prelude::*;
712 /// # use std::collections::HashSet;
713 /// # use futures::StreamExt;
714 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
715 /// let tick = process.tick();
716 /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
717 /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
718 /// stream1.cross_product(stream2)
719 /// # }, |mut stream| async move {
720 /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
721 /// # stream.map(|i| assert!(expected.contains(&i)));
722 /// # }));
723 /// ```
724 pub fn cross_product<T2, O2: Ordering>(
725 self,
726 other: Stream<T2, L, B, O2, R>,
727 ) -> Stream<(T, T2), L, B, NoOrder, R>
728 where
729 T: Clone,
730 T2: Clone,
731 {
732 check_matching_location(&self.location, &other.location);
733
734 Stream::new(
735 self.location.clone(),
736 HydroNode::CrossProduct {
737 left: Box::new(self.ir_node.into_inner()),
738 right: Box::new(other.ir_node.into_inner()),
739 metadata: self.location.new_node_metadata::<(T, T2)>(),
740 },
741 )
742 }
743
744 /// Takes one stream as input and filters out any duplicate occurrences. The output
745 /// contains all unique values from the input.
746 ///
747 /// # Example
748 /// ```rust
749 /// # use hydro_lang::prelude::*;
750 /// # use futures::StreamExt;
751 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
752 /// let tick = process.tick();
753 /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
754 /// # }, |mut stream| async move {
755 /// # for w in vec![1, 2, 3, 4] {
756 /// # assert_eq!(stream.next().await.unwrap(), w);
757 /// # }
758 /// # }));
759 /// ```
760 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
761 where
762 T: Eq + Hash,
763 {
764 Stream::new(
765 self.location.clone(),
766 HydroNode::Unique {
767 input: Box::new(self.ir_node.into_inner()),
768 metadata: self.location.new_node_metadata::<T>(),
769 },
770 )
771 }
772
773 /// Outputs everything in this stream that is *not* contained in the `other` stream.
774 ///
775 /// The `other` stream must be [`Bounded`], since this function will wait until
776 /// all its elements are available before producing any output.
777 /// # Example
778 /// ```rust
779 /// # use hydro_lang::prelude::*;
780 /// # use futures::StreamExt;
781 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
782 /// let tick = process.tick();
783 /// let stream = process
784 /// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
785 /// .batch(&tick, nondet!(/** test */));
786 /// let batch = process
787 /// .source_iter(q!(vec![1, 2]))
788 /// .batch(&tick, nondet!(/** test */));
789 /// stream.filter_not_in(batch).all_ticks()
790 /// # }, |mut stream| async move {
791 /// # for w in vec![3, 4] {
792 /// # assert_eq!(stream.next().await.unwrap(), w);
793 /// # }
794 /// # }));
795 /// ```
796 pub fn filter_not_in<O2: Ordering>(
797 self,
798 other: Stream<T, L, Bounded, O2, R>,
799 ) -> Stream<T, L, Bounded, O, R>
800 where
801 T: Eq + Hash,
802 {
803 check_matching_location(&self.location, &other.location);
804
805 Stream::new(
806 self.location.clone(),
807 HydroNode::Difference {
808 pos: Box::new(self.ir_node.into_inner()),
809 neg: Box::new(other.ir_node.into_inner()),
810 metadata: self.location.new_node_metadata::<T>(),
811 },
812 )
813 }
814
815 /// An operator which allows you to "inspect" each element of a stream without
816 /// modifying it. The closure `f` is called on a reference to each item. This is
817 /// mainly useful for debugging, and should not be used to generate side-effects.
818 ///
819 /// # Example
820 /// ```rust
821 /// # use hydro_lang::prelude::*;
822 /// # use futures::StreamExt;
823 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
824 /// let nums = process.source_iter(q!(vec![1, 2]));
825 /// // prints "1 * 10 = 10" and "2 * 10 = 20"
826 /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
827 /// # }, |mut stream| async move {
828 /// # for w in vec![1, 2] {
829 /// # assert_eq!(stream.next().await.unwrap(), w);
830 /// # }
831 /// # }));
832 /// ```
833 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<T, L, B, O, R>
834 where
835 F: Fn(&T) + 'a,
836 {
837 let f = f.splice_fn1_borrow_ctx(&self.location).into();
838
839 if L::is_top_level() {
840 Stream::new(
841 self.location.clone(),
842 HydroNode::Persist {
843 inner: Box::new(HydroNode::Inspect {
844 f,
845 input: Box::new(HydroNode::Unpersist {
846 inner: Box::new(self.ir_node.into_inner()),
847 metadata: self.location.new_node_metadata::<T>(),
848 }),
849 metadata: self.location.new_node_metadata::<T>(),
850 }),
851 metadata: self.location.new_node_metadata::<T>(),
852 },
853 )
854 } else {
855 Stream::new(
856 self.location.clone(),
857 HydroNode::Inspect {
858 f,
859 input: Box::new(self.ir_node.into_inner()),
860 metadata: self.location.new_node_metadata::<T>(),
861 },
862 )
863 }
864 }
865
866 /// An operator which allows you to "name" a `HydroNode`.
867 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
868 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
869 {
870 let mut node = self.ir_node.borrow_mut();
871 let metadata = node.metadata_mut();
872 metadata.tag = Some(name.to_string());
873 }
874 self
875 }
876
877 /// Explicitly "casts" the stream to a type with a different ordering
878 /// guarantee. Useful in unsafe code where the ordering cannot be proven
879 /// by the type-system.
880 ///
881 /// # Non-Determinism
882 /// This function is used as an escape hatch, and any mistakes in the
883 /// provided ordering guarantee will propagate into the guarantees
884 /// for the rest of the program.
885 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
886 Stream::new(self.location, self.ir_node.into_inner())
887 }
888
889 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
890 /// which is always safe because that is the weakest possible guarantee.
891 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
892 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
893 self.assume_ordering::<NoOrder>(nondet)
894 }
895
896 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
897 /// enforcing that `O2` is weaker than the input ordering guarantee.
898 pub fn weaken_ordering<O2: Ordering + MinOrder<O, Min = O2>>(self) -> Stream<T, L, B, O2, R> {
899 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
900 self.assume_ordering::<O2>(nondet)
901 }
902
903 /// Explicitly "casts" the stream to a type with a different retries
904 /// guarantee. Useful in unsafe code where the lack of retries cannot
905 /// be proven by the type-system.
906 ///
907 /// # Non-Determinism
908 /// This function is used as an escape hatch, and any mistakes in the
909 /// provided retries guarantee will propagate into the guarantees
910 /// for the rest of the program.
911 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
912 Stream::new(self.location, self.ir_node.into_inner())
913 }
914
915 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
916 /// which is always safe because that is the weakest possible guarantee.
917 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
918 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
919 self.assume_retries::<AtLeastOnce>(nondet)
920 }
921
922 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
923 /// enforcing that `R2` is weaker than the input retries guarantee.
924 pub fn weaken_retries<R2: Retries + MinRetries<R, Min = R2>>(self) -> Stream<T, L, B, O, R2> {
925 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
926 self.assume_retries::<R2>(nondet)
927 }
928}
929
930impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
931where
932 L: Location<'a>,
933{
934 /// Given a stream with [`ExactlyOnce`] retry guarantees, weakens it to an arbitrary guarantee
935 /// `R2`, which is safe because all guarantees are equal to or weaker than [`ExactlyOnce`]
936 pub fn weaker_retries<R2: Retries>(self) -> Stream<T, L, B, O, R2> {
937 self.assume_retries(
938 nondet!(/** any retry ordering is the same or weaker than ExactlyOnce */),
939 )
940 }
941}
942
943impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
944where
945 L: Location<'a>,
946{
947 /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
948 ///
949 /// # Example
950 /// ```rust
951 /// # use hydro_lang::prelude::*;
952 /// # use futures::StreamExt;
953 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
954 /// process.source_iter(q!(&[1, 2, 3])).cloned()
955 /// # }, |mut stream| async move {
956 /// // 1, 2, 3
957 /// # for w in vec![1, 2, 3] {
958 /// # assert_eq!(stream.next().await.unwrap(), w);
959 /// # }
960 /// # }));
961 /// ```
962 pub fn cloned(self) -> Stream<T, L, B, O, R>
963 where
964 T: Clone,
965 {
966 self.map(q!(|d| d.clone()))
967 }
968}
969
970impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
971where
972 L: Location<'a>,
973{
974 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
975 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
976 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
977 ///
978 /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
979 /// and there may be duplicates.
980 ///
981 /// # Example
982 /// ```rust
983 /// # use hydro_lang::prelude::*;
984 /// # use futures::StreamExt;
985 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
986 /// let tick = process.tick();
987 /// let bools = process.source_iter(q!(vec![false, true, false]));
988 /// let batch = bools.batch(&tick, nondet!(/** test */));
989 /// batch
990 /// .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
991 /// .all_ticks()
992 /// # }, |mut stream| async move {
993 /// // true
994 /// # assert_eq!(stream.next().await.unwrap(), true);
995 /// # }));
996 /// ```
997 pub fn fold_commutative_idempotent<A, I, F>(
998 self,
999 init: impl IntoQuotedMut<'a, I, L>,
1000 comb: impl IntoQuotedMut<'a, F, L>,
1001 ) -> Singleton<A, L, B>
1002 where
1003 I: Fn() -> A + 'a,
1004 F: Fn(&mut A, T),
1005 {
1006 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1007 self.assume_ordering(nondet)
1008 .assume_retries(nondet)
1009 .fold(init, comb)
1010 }
1011
1012 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1013 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1014 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1015 /// reference, so that it can be modified in place.
1016 ///
1017 /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
1018 /// and there may be duplicates.
1019 ///
1020 /// # Example
1021 /// ```rust
1022 /// # use hydro_lang::prelude::*;
1023 /// # use futures::StreamExt;
1024 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1025 /// let tick = process.tick();
1026 /// let bools = process.source_iter(q!(vec![false, true, false]));
1027 /// let batch = bools.batch(&tick, nondet!(/** test */));
1028 /// batch
1029 /// .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1030 /// .all_ticks()
1031 /// # }, |mut stream| async move {
1032 /// // true
1033 /// # assert_eq!(stream.next().await.unwrap(), true);
1034 /// # }));
1035 /// ```
1036 pub fn reduce_commutative_idempotent<F>(
1037 self,
1038 comb: impl IntoQuotedMut<'a, F, L>,
1039 ) -> Optional<T, L, B>
1040 where
1041 F: Fn(&mut T, T) + 'a,
1042 {
1043 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1044 self.assume_ordering(nondet)
1045 .assume_retries(nondet)
1046 .reduce(comb)
1047 }
1048
1049 /// Computes the maximum element in the stream as an [`Optional`], which
1050 /// will be empty until the first element in the input arrives.
1051 ///
1052 /// # Example
1053 /// ```rust
1054 /// # use hydro_lang::prelude::*;
1055 /// # use futures::StreamExt;
1056 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1057 /// let tick = process.tick();
1058 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1059 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1060 /// batch.max().all_ticks()
1061 /// # }, |mut stream| async move {
1062 /// // 4
1063 /// # assert_eq!(stream.next().await.unwrap(), 4);
1064 /// # }));
1065 /// ```
1066 pub fn max(self) -> Optional<T, L, B>
1067 where
1068 T: Ord,
1069 {
1070 self.reduce_commutative_idempotent(q!(|curr, new| {
1071 if new > *curr {
1072 *curr = new;
1073 }
1074 }))
1075 }
1076
1077 /// Computes the maximum element in the stream as an [`Optional`], where the
1078 /// maximum is determined according to the `key` function. The [`Optional`] will
1079 /// be empty until the first element in the input arrives.
1080 ///
1081 /// # Example
1082 /// ```rust
1083 /// # use hydro_lang::prelude::*;
1084 /// # use futures::StreamExt;
1085 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1086 /// let tick = process.tick();
1087 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1088 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1089 /// batch.max_by_key(q!(|x| -x)).all_ticks()
1090 /// # }, |mut stream| async move {
1091 /// // 1
1092 /// # assert_eq!(stream.next().await.unwrap(), 1);
1093 /// # }));
1094 /// ```
1095 pub fn max_by_key<K, F>(self, key: impl IntoQuotedMut<'a, F, L> + Copy) -> Optional<T, L, B>
1096 where
1097 K: Ord,
1098 F: Fn(&T) -> K + 'a,
1099 {
1100 let f = key.splice_fn1_borrow_ctx(&self.location);
1101
1102 let wrapped: syn::Expr = parse_quote!({
1103 let key_fn = #f;
1104 move |curr, new| {
1105 if key_fn(&new) > key_fn(&*curr) {
1106 *curr = new;
1107 }
1108 }
1109 });
1110
1111 let mut core = HydroNode::Reduce {
1112 f: wrapped.into(),
1113 input: Box::new(self.ir_node.into_inner()),
1114 metadata: self.location.new_node_metadata::<T>(),
1115 };
1116
1117 if L::is_top_level() {
1118 core = HydroNode::Persist {
1119 inner: Box::new(core),
1120 metadata: self.location.new_node_metadata::<T>(),
1121 };
1122 }
1123
1124 Optional::new(self.location, core)
1125 }
1126
1127 /// Computes the minimum element in the stream as an [`Optional`], which
1128 /// will be empty until the first element in the input arrives.
1129 ///
1130 /// # Example
1131 /// ```rust
1132 /// # use hydro_lang::prelude::*;
1133 /// # use futures::StreamExt;
1134 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1135 /// let tick = process.tick();
1136 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1137 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1138 /// batch.min().all_ticks()
1139 /// # }, |mut stream| async move {
1140 /// // 1
1141 /// # assert_eq!(stream.next().await.unwrap(), 1);
1142 /// # }));
1143 /// ```
1144 pub fn min(self) -> Optional<T, L, B>
1145 where
1146 T: Ord,
1147 {
1148 self.reduce_commutative_idempotent(q!(|curr, new| {
1149 if new < *curr {
1150 *curr = new;
1151 }
1152 }))
1153 }
1154}
1155
1156impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1157where
1158 L: Location<'a>,
1159{
1160 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1161 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1162 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1163 ///
1164 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1165 ///
1166 /// # Example
1167 /// ```rust
1168 /// # use hydro_lang::prelude::*;
1169 /// # use futures::StreamExt;
1170 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1171 /// let tick = process.tick();
1172 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1173 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1174 /// batch
1175 /// .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1176 /// .all_ticks()
1177 /// # }, |mut stream| async move {
1178 /// // 10
1179 /// # assert_eq!(stream.next().await.unwrap(), 10);
1180 /// # }));
1181 /// ```
1182 pub fn fold_commutative<A, I, F>(
1183 self,
1184 init: impl IntoQuotedMut<'a, I, L>,
1185 comb: impl IntoQuotedMut<'a, F, L>,
1186 ) -> Singleton<A, L, B>
1187 where
1188 I: Fn() -> A + 'a,
1189 F: Fn(&mut A, T),
1190 {
1191 let nondet = nondet!(/** the combinator function is commutative */);
1192 self.assume_ordering(nondet).fold(init, comb)
1193 }
1194
1195 /// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream,
1196 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1197 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1198 /// reference, so that it can be modified in place.
1199 ///
1200 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1201 ///
1202 /// # Example
1203 /// ```rust
1204 /// # use hydro_lang::prelude::*;
1205 /// # use futures::StreamExt;
1206 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1207 /// let tick = process.tick();
1208 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1209 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1210 /// batch
1211 /// .reduce_commutative(q!(|curr, new| *curr += new))
1212 /// .all_ticks()
1213 /// # }, |mut stream| async move {
1214 /// // 10
1215 /// # assert_eq!(stream.next().await.unwrap(), 10);
1216 /// # }));
1217 /// ```
1218 pub fn reduce_commutative<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1219 where
1220 F: Fn(&mut T, T) + 'a,
1221 {
1222 let nondet = nondet!(/** the combinator function is commutative */);
1223 self.assume_ordering(nondet).reduce(comb)
1224 }
1225
1226 /// Computes the number of elements in the stream as a [`Singleton`].
1227 ///
1228 /// # Example
1229 /// ```rust
1230 /// # use hydro_lang::prelude::*;
1231 /// # use futures::StreamExt;
1232 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1233 /// let tick = process.tick();
1234 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1235 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1236 /// batch.count().all_ticks()
1237 /// # }, |mut stream| async move {
1238 /// // 4
1239 /// # assert_eq!(stream.next().await.unwrap(), 4);
1240 /// # }));
1241 /// ```
1242 pub fn count(self) -> Singleton<usize, L, B> {
1243 self.fold_commutative(q!(|| 0usize), q!(|count, _| *count += 1))
1244 }
1245}
1246
1247impl<'a, T, L, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R>
1248where
1249 L: Location<'a>,
1250{
1251 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1252 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1253 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1254 ///
1255 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1256 ///
1257 /// # Example
1258 /// ```rust
1259 /// # use hydro_lang::prelude::*;
1260 /// # use futures::StreamExt;
1261 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1262 /// let tick = process.tick();
1263 /// let bools = process.source_iter(q!(vec![false, true, false]));
1264 /// let batch = bools.batch(&tick, nondet!(/** test */));
1265 /// batch
1266 /// .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1267 /// .all_ticks()
1268 /// # }, |mut stream| async move {
1269 /// // true
1270 /// # assert_eq!(stream.next().await.unwrap(), true);
1271 /// # }));
1272 /// ```
1273 pub fn fold_idempotent<A, I, F>(
1274 self,
1275 init: impl IntoQuotedMut<'a, I, L>,
1276 comb: impl IntoQuotedMut<'a, F, L>,
1277 ) -> Singleton<A, L, B>
1278 where
1279 I: Fn() -> A + 'a,
1280 F: Fn(&mut A, T),
1281 {
1282 let nondet = nondet!(/** the combinator function is idempotent */);
1283 self.assume_retries(nondet).fold(init, comb)
1284 }
1285
1286 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1287 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1288 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1289 /// reference, so that it can be modified in place.
1290 ///
1291 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1292 ///
1293 /// # Example
1294 /// ```rust
1295 /// # use hydro_lang::prelude::*;
1296 /// # use futures::StreamExt;
1297 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1298 /// let tick = process.tick();
1299 /// let bools = process.source_iter(q!(vec![false, true, false]));
1300 /// let batch = bools.batch(&tick, nondet!(/** test */));
1301 /// batch.reduce_idempotent(q!(|acc, x| *acc |= x)).all_ticks()
1302 /// # }, |mut stream| async move {
1303 /// // true
1304 /// # assert_eq!(stream.next().await.unwrap(), true);
1305 /// # }));
1306 /// ```
1307 pub fn reduce_idempotent<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1308 where
1309 F: Fn(&mut T, T) + 'a,
1310 {
1311 let nondet = nondet!(/** the combinator function is idempotent */);
1312 self.assume_retries(nondet).reduce(comb)
1313 }
1314
1315 /// Computes the first element in the stream as an [`Optional`], which
1316 /// will be empty until the first element in the input arrives.
1317 ///
1318 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1319 /// re-ordering of elements may cause the first element to change.
1320 ///
1321 /// # Example
1322 /// ```rust
1323 /// # use hydro_lang::prelude::*;
1324 /// # use futures::StreamExt;
1325 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1326 /// let tick = process.tick();
1327 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1328 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1329 /// batch.first().all_ticks()
1330 /// # }, |mut stream| async move {
1331 /// // 1
1332 /// # assert_eq!(stream.next().await.unwrap(), 1);
1333 /// # }));
1334 /// ```
1335 pub fn first(self) -> Optional<T, L, B> {
1336 self.reduce_idempotent(q!(|_, _| {}))
1337 }
1338
1339 /// Computes the last element in the stream as an [`Optional`], which
1340 /// will be empty until an element in the input arrives.
1341 ///
1342 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1343 /// re-ordering of elements may cause the last element to change.
1344 ///
1345 /// # Example
1346 /// ```rust
1347 /// # use hydro_lang::prelude::*;
1348 /// # use futures::StreamExt;
1349 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1350 /// let tick = process.tick();
1351 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1352 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1353 /// batch.last().all_ticks()
1354 /// # }, |mut stream| async move {
1355 /// // 4
1356 /// # assert_eq!(stream.next().await.unwrap(), 4);
1357 /// # }));
1358 /// ```
1359 pub fn last(self) -> Optional<T, L, B> {
1360 self.reduce_idempotent(q!(|curr, new| *curr = new))
1361 }
1362}
1363
1364impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
1365where
1366 L: Location<'a>,
1367{
1368 /// Returns a stream with the current count tupled with each element in the input stream.
1369 ///
1370 /// # Example
1371 /// ```rust
1372 /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1373 /// # use futures::StreamExt;
1374 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, TotalOrder, ExactlyOnce>(|process| {
1375 /// let tick = process.tick();
1376 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1377 /// numbers.enumerate()
1378 /// # }, |mut stream| async move {
1379 /// // (0, 1), (1, 2), (2, 3), (3, 4)
1380 /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1381 /// # assert_eq!(stream.next().await.unwrap(), w);
1382 /// # }
1383 /// # }));
1384 /// ```
1385 pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder, ExactlyOnce> {
1386 if L::is_top_level() {
1387 Stream::new(
1388 self.location.clone(),
1389 HydroNode::Persist {
1390 inner: Box::new(HydroNode::Enumerate {
1391 is_static: true,
1392 input: Box::new(HydroNode::Unpersist {
1393 inner: Box::new(self.ir_node.into_inner()),
1394 metadata: self.location.new_node_metadata::<T>(),
1395 }),
1396 metadata: self.location.new_node_metadata::<(usize, T)>(),
1397 }),
1398 metadata: self.location.new_node_metadata::<(usize, T)>(),
1399 },
1400 )
1401 } else {
1402 Stream::new(
1403 self.location.clone(),
1404 HydroNode::Enumerate {
1405 is_static: false,
1406 input: Box::new(self.ir_node.into_inner()),
1407 metadata: self.location.new_node_metadata::<(usize, T)>(),
1408 },
1409 )
1410 }
1411 }
1412
1413 /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1414 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1415 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1416 ///
1417 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1418 /// to depend on the order of elements in the stream.
1419 ///
1420 /// # Example
1421 /// ```rust
1422 /// # use hydro_lang::prelude::*;
1423 /// # use futures::StreamExt;
1424 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1425 /// let tick = process.tick();
1426 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1427 /// let batch = words.batch(&tick, nondet!(/** test */));
1428 /// batch
1429 /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1430 /// .all_ticks()
1431 /// # }, |mut stream| async move {
1432 /// // "HELLOWORLD"
1433 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1434 /// # }));
1435 /// ```
1436 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
1437 self,
1438 init: impl IntoQuotedMut<'a, I, L>,
1439 comb: impl IntoQuotedMut<'a, F, L>,
1440 ) -> Singleton<A, L, B> {
1441 let init = init.splice_fn0_ctx(&self.location).into();
1442 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1443
1444 let mut core = HydroNode::Fold {
1445 init,
1446 acc: comb,
1447 input: Box::new(self.ir_node.into_inner()),
1448 metadata: self.location.new_node_metadata::<A>(),
1449 };
1450
1451 if L::is_top_level() {
1452 // top-level (possibly unbounded) singletons are represented as
1453 // a stream which produces all values from all ticks every tick,
1454 // so Unpersist will always give the lastest aggregation
1455 core = HydroNode::Persist {
1456 inner: Box::new(core),
1457 metadata: self.location.new_node_metadata::<A>(),
1458 };
1459 }
1460
1461 Singleton::new(self.location, core)
1462 }
1463
1464 /// Collects all the elements of this stream into a single [`Vec`] element.
1465 ///
1466 /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1467 /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1468 /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1469 /// the vector at an arbitrary point in time.
1470 ///
1471 /// # Example
1472 /// ```rust
1473 /// # use hydro_lang::prelude::*;
1474 /// # use futures::StreamExt;
1475 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1476 /// let tick = process.tick();
1477 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1478 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1479 /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1480 /// # }, |mut stream| async move {
1481 /// // [ vec![1, 2, 3, 4] ]
1482 /// # for w in vec![vec![1, 2, 3, 4]] {
1483 /// # assert_eq!(stream.next().await.unwrap(), w);
1484 /// # }
1485 /// # }));
1486 /// ```
1487 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B> {
1488 self.fold(
1489 q!(|| vec![]),
1490 q!(|acc, v| {
1491 acc.push(v);
1492 }),
1493 )
1494 }
1495
1496 /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1497 /// and emitting each intermediate result.
1498 ///
1499 /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1500 /// containing all intermediate accumulated values. The scan operation can also terminate early
1501 /// by returning `None`.
1502 ///
1503 /// The function takes a mutable reference to the accumulator and the current element, and returns
1504 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1505 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1506 ///
1507 /// # Examples
1508 ///
1509 /// Basic usage - running sum:
1510 /// ```rust
1511 /// # use hydro_lang::prelude::*;
1512 /// # use futures::StreamExt;
1513 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1514 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1515 /// q!(|| 0),
1516 /// q!(|acc, x| {
1517 /// *acc += x;
1518 /// Some(*acc)
1519 /// }),
1520 /// )
1521 /// # }, |mut stream| async move {
1522 /// // Output: 1, 3, 6, 10
1523 /// # for w in vec![1, 3, 6, 10] {
1524 /// # assert_eq!(stream.next().await.unwrap(), w);
1525 /// # }
1526 /// # }));
1527 /// ```
1528 ///
1529 /// Early termination example:
1530 /// ```rust
1531 /// # use hydro_lang::prelude::*;
1532 /// # use futures::StreamExt;
1533 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1534 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1535 /// q!(|| 1),
1536 /// q!(|state, x| {
1537 /// *state = *state * x;
1538 /// if *state > 6 {
1539 /// None // Terminate the stream
1540 /// } else {
1541 /// Some(-*state)
1542 /// }
1543 /// }),
1544 /// )
1545 /// # }, |mut stream| async move {
1546 /// // Output: -1, -2, -6
1547 /// # for w in vec![-1, -2, -6] {
1548 /// # assert_eq!(stream.next().await.unwrap(), w);
1549 /// # }
1550 /// # }));
1551 /// ```
1552 pub fn scan<A, U, I, F>(
1553 self,
1554 init: impl IntoQuotedMut<'a, I, L>,
1555 f: impl IntoQuotedMut<'a, F, L>,
1556 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1557 where
1558 I: Fn() -> A + 'a,
1559 F: Fn(&mut A, T) -> Option<U> + 'a,
1560 {
1561 let init = init.splice_fn0_ctx(&self.location).into();
1562 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1563
1564 if L::is_top_level() {
1565 Stream::new(
1566 self.location.clone(),
1567 HydroNode::Persist {
1568 inner: Box::new(HydroNode::Scan {
1569 init,
1570 acc: f,
1571 input: Box::new(self.ir_node.into_inner()),
1572 metadata: self.location.new_node_metadata::<U>(),
1573 }),
1574 metadata: self.location.new_node_metadata::<U>(),
1575 },
1576 )
1577 } else {
1578 Stream::new(
1579 self.location.clone(),
1580 HydroNode::Scan {
1581 init,
1582 acc: f,
1583 input: Box::new(self.ir_node.into_inner()),
1584 metadata: self.location.new_node_metadata::<U>(),
1585 },
1586 )
1587 }
1588 }
1589
1590 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1591 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1592 /// until the first element in the input arrives.
1593 ///
1594 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1595 /// to depend on the order of elements in the stream.
1596 ///
1597 /// # Example
1598 /// ```rust
1599 /// # use hydro_lang::prelude::*;
1600 /// # use futures::StreamExt;
1601 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1602 /// let tick = process.tick();
1603 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1604 /// let batch = words.batch(&tick, nondet!(/** test */));
1605 /// batch
1606 /// .map(q!(|x| x.to_string()))
1607 /// .reduce(q!(|curr, new| curr.push_str(&new)))
1608 /// .all_ticks()
1609 /// # }, |mut stream| async move {
1610 /// // "HELLOWORLD"
1611 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1612 /// # }));
1613 /// ```
1614 pub fn reduce<F: Fn(&mut T, T) + 'a>(
1615 self,
1616 comb: impl IntoQuotedMut<'a, F, L>,
1617 ) -> Optional<T, L, B> {
1618 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1619 let mut core = HydroNode::Reduce {
1620 f,
1621 input: Box::new(self.ir_node.into_inner()),
1622 metadata: self.location.new_node_metadata::<T>(),
1623 };
1624
1625 if L::is_top_level() {
1626 core = HydroNode::Persist {
1627 inner: Box::new(core),
1628 metadata: self.location.new_node_metadata::<T>(),
1629 };
1630 }
1631
1632 Optional::new(self.location, core)
1633 }
1634}
1635
1636impl<'a, T, L: Location<'a> + NoTick + NoAtomic, O: Ordering, R: Retries>
1637 Stream<T, L, Unbounded, O, R>
1638{
1639 /// Produces a new stream that interleaves the elements of the two input streams.
1640 /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1641 ///
1642 /// Currently, both input streams must be [`Unbounded`]. When the streams are
1643 /// [`Bounded`], you can use [`Stream::chain`] instead.
1644 ///
1645 /// # Example
1646 /// ```rust
1647 /// # use hydro_lang::prelude::*;
1648 /// # use futures::StreamExt;
1649 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1650 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1651 /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1652 /// # }, |mut stream| async move {
1653 /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1654 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1655 /// # assert_eq!(stream.next().await.unwrap(), w);
1656 /// # }
1657 /// # }));
1658 /// ```
1659 pub fn interleave<O2: Ordering, R2: Retries>(
1660 self,
1661 other: Stream<T, L, Unbounded, O2, R2>,
1662 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1663 where
1664 R: MinRetries<R2>,
1665 {
1666 let tick = self.location.tick();
1667 // Because the outputs are unordered, we can interleave batches from both streams.
1668 let nondet_batch_interleaving = nondet!(/** output stream is NoOrder, can interleave */);
1669 self.batch(&tick, nondet_batch_interleaving)
1670 .weakest_ordering()
1671 .chain(
1672 other
1673 .batch(&tick, nondet_batch_interleaving)
1674 .weakest_ordering(),
1675 )
1676 .all_ticks()
1677 }
1678}
1679
1680impl<'a, T, L, O: Ordering, R: Retries> Stream<T, L, Bounded, O, R>
1681where
1682 L: Location<'a>,
1683{
1684 /// Produces a new stream that emits the input elements in sorted order.
1685 ///
1686 /// The input stream can have any ordering guarantee, but the output stream
1687 /// will have a [`TotalOrder`] guarantee. This operator will block until all
1688 /// elements in the input stream are available, so it requires the input stream
1689 /// to be [`Bounded`].
1690 ///
1691 /// # Example
1692 /// ```rust
1693 /// # use hydro_lang::prelude::*;
1694 /// # use futures::StreamExt;
1695 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1696 /// let tick = process.tick();
1697 /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1698 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1699 /// batch.sort().all_ticks()
1700 /// # }, |mut stream| async move {
1701 /// // 1, 2, 3, 4
1702 /// # for w in (1..5) {
1703 /// # assert_eq!(stream.next().await.unwrap(), w);
1704 /// # }
1705 /// # }));
1706 /// ```
1707 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1708 where
1709 T: Ord,
1710 {
1711 Stream::new(
1712 self.location.clone(),
1713 HydroNode::Sort {
1714 input: Box::new(self.ir_node.into_inner()),
1715 metadata: self.location.new_node_metadata::<T>(),
1716 },
1717 )
1718 }
1719
1720 /// Produces a new stream that first emits the elements of the `self` stream,
1721 /// and then emits the elements of the `other` stream. The output stream has
1722 /// a [`TotalOrder`] guarantee if and only if both input streams have a
1723 /// [`TotalOrder`] guarantee.
1724 ///
1725 /// Currently, both input streams must be [`Bounded`]. This operator will block
1726 /// on the first stream until all its elements are available. In a future version,
1727 /// we will relax the requirement on the `other` stream.
1728 ///
1729 /// # Example
1730 /// ```rust
1731 /// # use hydro_lang::prelude::*;
1732 /// # use futures::StreamExt;
1733 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1734 /// let tick = process.tick();
1735 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1736 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1737 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1738 /// # }, |mut stream| async move {
1739 /// // 2, 3, 4, 5, 1, 2, 3, 4
1740 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1741 /// # assert_eq!(stream.next().await.unwrap(), w);
1742 /// # }
1743 /// # }));
1744 /// ```
1745 pub fn chain<O2: Ordering, R2: Retries>(
1746 self,
1747 other: Stream<T, L, Bounded, O2, R2>,
1748 ) -> Stream<T, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1749 where
1750 O: MinOrder<O2>,
1751 R: MinRetries<R2>,
1752 {
1753 check_matching_location(&self.location, &other.location);
1754
1755 Stream::new(
1756 self.location.clone(),
1757 HydroNode::Chain {
1758 first: Box::new(self.ir_node.into_inner()),
1759 second: Box::new(other.ir_node.into_inner()),
1760 metadata: self.location.new_node_metadata::<T>(),
1761 },
1762 )
1763 }
1764
1765 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
1766 /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
1767 /// because this is compiled into a nested loop.
1768 pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
1769 self,
1770 other: Stream<T2, L, Bounded, O2, R>,
1771 ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
1772 where
1773 T: Clone,
1774 T2: Clone,
1775 {
1776 check_matching_location(&self.location, &other.location);
1777
1778 Stream::new(
1779 self.location.clone(),
1780 HydroNode::CrossProduct {
1781 left: Box::new(self.ir_node.into_inner()),
1782 right: Box::new(other.ir_node.into_inner()),
1783 metadata: self.location.new_node_metadata::<(T, T2)>(),
1784 },
1785 )
1786 }
1787
1788 /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
1789 /// `self` used as the values for *each* key.
1790 ///
1791 /// This is helpful when "broadcasting" a set of values so that all the keys have the same
1792 /// values. For example, it can be used to send the same set of elements to several cluster
1793 /// members, if the membership information is available as a [`KeyedSingleton`].
1794 ///
1795 /// # Example
1796 /// ```rust
1797 /// # use hydro_lang::prelude::*;
1798 /// # use futures::StreamExt;
1799 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1800 /// # let tick = process.tick();
1801 /// let keyed_singleton = // { 1: (), 2: () }
1802 /// # process
1803 /// # .source_iter(q!(vec![(1, ()), (2, ())]))
1804 /// # .into_keyed()
1805 /// # .batch(&tick, nondet!(/** test */))
1806 /// # .first();
1807 /// let stream = // [ "a", "b" ]
1808 /// # process
1809 /// # .source_iter(q!(vec!["a".to_string(), "b".to_string()]))
1810 /// # .batch(&tick, nondet!(/** test */));
1811 /// stream.repeat_with_keys(keyed_singleton)
1812 /// # .entries().all_ticks()
1813 /// # }, |mut stream| async move {
1814 /// // { 1: ["a", "b" ], 2: ["a", "b"] }
1815 /// # let mut results = Vec::new();
1816 /// # for _ in 0..4 {
1817 /// # results.push(stream.next().await.unwrap());
1818 /// # }
1819 /// # results.sort();
1820 /// # assert_eq!(results, vec![(1, "a".to_string()), (1, "b".to_string()), (2, "a".to_string()), (2, "b".to_string())]);
1821 /// # }));
1822 /// ```
1823 pub fn repeat_with_keys<K, V2>(
1824 self,
1825 keys: KeyedSingleton<K, V2, L, Bounded>,
1826 ) -> KeyedStream<K, T, L, Bounded, O, R>
1827 where
1828 K: Clone,
1829 T: Clone,
1830 {
1831 keys.keys().weaken_retries().cross_product_nested_loop(self).into_keyed().assume_ordering(
1832 nondet!(/** keyed stream does not depend on ordering of keys, cross_product_nested_loop preserves order of values */)
1833 )
1834 }
1835}
1836
1837impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
1838where
1839 L: Location<'a>,
1840{
1841 #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
1842 /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
1843 /// by equi-joining the two streams on the key attribute `K`.
1844 ///
1845 /// # Example
1846 /// ```rust
1847 /// # use hydro_lang::prelude::*;
1848 /// # use std::collections::HashSet;
1849 /// # use futures::StreamExt;
1850 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1851 /// let tick = process.tick();
1852 /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
1853 /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
1854 /// stream1.join(stream2)
1855 /// # }, |mut stream| async move {
1856 /// // (1, ('a', 'x')), (2, ('b', 'y'))
1857 /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
1858 /// # stream.map(|i| assert!(expected.contains(&i)));
1859 /// # }));
1860 pub fn join<V2, O2: Ordering, R2: Retries>(
1861 self,
1862 n: Stream<(K, V2), L, B, O2, R2>,
1863 ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
1864 where
1865 K: Eq + Hash,
1866 R: MinRetries<R2>,
1867 {
1868 check_matching_location(&self.location, &n.location);
1869
1870 Stream::new(
1871 self.location.clone(),
1872 HydroNode::Join {
1873 left: Box::new(self.ir_node.into_inner()),
1874 right: Box::new(n.ir_node.into_inner()),
1875 metadata: self.location.new_node_metadata::<(K, (V1, V2))>(),
1876 },
1877 )
1878 }
1879
1880 /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
1881 /// computes the anti-join of the items in the input -- i.e. returns
1882 /// unique items in the first input that do not have a matching key
1883 /// in the second input.
1884 ///
1885 /// # Example
1886 /// ```rust
1887 /// # use hydro_lang::prelude::*;
1888 /// # use futures::StreamExt;
1889 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1890 /// let tick = process.tick();
1891 /// let stream = process
1892 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1893 /// .batch(&tick, nondet!(/** test */));
1894 /// let batch = process
1895 /// .source_iter(q!(vec![1, 2]))
1896 /// .batch(&tick, nondet!(/** test */));
1897 /// stream.anti_join(batch).all_ticks()
1898 /// # }, |mut stream| async move {
1899 /// # for w in vec![(3, 'c'), (4, 'd')] {
1900 /// # assert_eq!(stream.next().await.unwrap(), w);
1901 /// # }
1902 /// # }));
1903 pub fn anti_join<O2: Ordering, R2: Retries>(
1904 self,
1905 n: Stream<K, L, Bounded, O2, R2>,
1906 ) -> Stream<(K, V1), L, B, O, R>
1907 where
1908 K: Eq + Hash,
1909 {
1910 check_matching_location(&self.location, &n.location);
1911
1912 Stream::new(
1913 self.location.clone(),
1914 HydroNode::AntiJoin {
1915 pos: Box::new(self.ir_node.into_inner()),
1916 neg: Box::new(n.ir_node.into_inner()),
1917 metadata: self.location.new_node_metadata::<(K, V1)>(),
1918 },
1919 )
1920 }
1921}
1922
1923impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
1924 Stream<(K, V), L, B, O, R>
1925{
1926 /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
1927 /// is used as the key and the second element is added to the entries associated with that key.
1928 ///
1929 /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
1930 /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
1931 /// performing grouped aggregations, but also for more precise ordering guarantees such as
1932 /// total ordering _within_ each group but no ordering _across_ groups.
1933 ///
1934 /// # Example
1935 /// ```rust
1936 /// # use hydro_lang::prelude::*;
1937 /// # use futures::StreamExt;
1938 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1939 /// process
1940 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1941 /// .into_keyed()
1942 /// # .entries()
1943 /// # }, |mut stream| async move {
1944 /// // { 1: [2, 3], 2: [4] }
1945 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
1946 /// # assert_eq!(stream.next().await.unwrap(), w);
1947 /// # }
1948 /// # }));
1949 /// ```
1950 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
1951 KeyedStream {
1952 underlying: self.weakest_ordering(),
1953 _phantom_order: Default::default(),
1954 }
1955 }
1956}
1957
1958impl<'a, K, V, L> Stream<(K, V), Tick<L>, Bounded, TotalOrder, ExactlyOnce>
1959where
1960 K: Eq + Hash,
1961 L: Location<'a>,
1962{
1963 #[deprecated = "use .into_keyed().fold(...) instead"]
1964 /// A special case of [`Stream::fold`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1965 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1966 /// in the second element are accumulated via the `comb` closure.
1967 ///
1968 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1969 /// to depend on the order of elements in the stream.
1970 ///
1971 /// If the input and output value types are the same and do not require initialization then use
1972 /// [`Stream::reduce_keyed`].
1973 ///
1974 /// # Example
1975 /// ```rust
1976 /// # use hydro_lang::prelude::*;
1977 /// # use futures::StreamExt;
1978 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1979 /// let tick = process.tick();
1980 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1981 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1982 /// batch
1983 /// .fold_keyed(q!(|| 0), q!(|acc, x| *acc += x))
1984 /// .all_ticks()
1985 /// # }, |mut stream| async move {
1986 /// // (1, 5), (2, 7)
1987 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1988 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1989 /// # }));
1990 /// ```
1991 pub fn fold_keyed<A, I, F>(
1992 self,
1993 init: impl IntoQuotedMut<'a, I, Tick<L>>,
1994 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1995 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
1996 where
1997 I: Fn() -> A + 'a,
1998 F: Fn(&mut A, V) + 'a,
1999 {
2000 self.into_keyed().fold(init, comb).entries()
2001 }
2002
2003 #[deprecated = "use .into_keyed().reduce(...) instead"]
2004 /// A special case of [`Stream::reduce`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2005 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2006 /// in the second element are accumulated via the `comb` closure.
2007 ///
2008 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
2009 /// to depend on the order of elements in the stream.
2010 ///
2011 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed`].
2012 ///
2013 /// # Example
2014 /// ```rust
2015 /// # use hydro_lang::prelude::*;
2016 /// # use futures::StreamExt;
2017 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2018 /// let tick = process.tick();
2019 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2020 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2021 /// batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks()
2022 /// # }, |mut stream| async move {
2023 /// // (1, 5), (2, 7)
2024 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2025 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2026 /// # }));
2027 /// ```
2028 pub fn reduce_keyed<F>(
2029 self,
2030 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2031 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2032 where
2033 F: Fn(&mut V, V) + 'a,
2034 {
2035 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
2036
2037 Stream::new(
2038 self.location.clone(),
2039 HydroNode::ReduceKeyed {
2040 f,
2041 input: Box::new(self.ir_node.into_inner()),
2042 metadata: self.location.new_node_metadata::<(K, V)>(),
2043 },
2044 )
2045 }
2046}
2047
2048impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2049where
2050 K: Eq + Hash,
2051 L: Location<'a>,
2052{
2053 #[deprecated = "use .into_keyed().fold_commutative_idempotent(...) instead"]
2054 /// A special case of [`Stream::fold_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2055 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2056 /// in the second element are accumulated via the `comb` closure.
2057 ///
2058 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2059 /// as there may be non-deterministic duplicates.
2060 ///
2061 /// If the input and output value types are the same and do not require initialization then use
2062 /// [`Stream::reduce_keyed_commutative_idempotent`].
2063 ///
2064 /// # Example
2065 /// ```rust
2066 /// # use hydro_lang::prelude::*;
2067 /// # use futures::StreamExt;
2068 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2069 /// let tick = process.tick();
2070 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2071 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2072 /// batch
2073 /// .fold_keyed_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2074 /// .all_ticks()
2075 /// # }, |mut stream| async move {
2076 /// // (1, false), (2, true)
2077 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2078 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2079 /// # }));
2080 /// ```
2081 pub fn fold_keyed_commutative_idempotent<A, I, F>(
2082 self,
2083 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2084 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2085 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2086 where
2087 I: Fn() -> A + 'a,
2088 F: Fn(&mut A, V) + 'a,
2089 {
2090 self.into_keyed()
2091 .fold_commutative_idempotent(init, comb)
2092 .entries()
2093 }
2094
2095 /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2096 /// # Example
2097 /// ```rust
2098 /// # use hydro_lang::prelude::*;
2099 /// # use futures::StreamExt;
2100 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2101 /// let tick = process.tick();
2102 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2103 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2104 /// batch.keys().all_ticks()
2105 /// # }, |mut stream| async move {
2106 /// // 1, 2
2107 /// # assert_eq!(stream.next().await.unwrap(), 1);
2108 /// # assert_eq!(stream.next().await.unwrap(), 2);
2109 /// # }));
2110 /// ```
2111 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2112 self.into_keyed()
2113 .fold_commutative_idempotent(q!(|| ()), q!(|_, _| {}))
2114 .keys()
2115 }
2116
2117 #[deprecated = "use .into_keyed().reduce_commutative_idempotent(...) instead"]
2118 /// A special case of [`Stream::reduce_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2119 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2120 /// in the second element are accumulated via the `comb` closure.
2121 ///
2122 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2123 /// as there may be non-deterministic duplicates.
2124 ///
2125 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative_idempotent`].
2126 ///
2127 /// # Example
2128 /// ```rust
2129 /// # use hydro_lang::prelude::*;
2130 /// # use futures::StreamExt;
2131 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2132 /// let tick = process.tick();
2133 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2134 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2135 /// batch
2136 /// .reduce_keyed_commutative_idempotent(q!(|acc, x| *acc |= x))
2137 /// .all_ticks()
2138 /// # }, |mut stream| async move {
2139 /// // (1, false), (2, true)
2140 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2141 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2142 /// # }));
2143 /// ```
2144 pub fn reduce_keyed_commutative_idempotent<F>(
2145 self,
2146 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2147 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2148 where
2149 F: Fn(&mut V, V) + 'a,
2150 {
2151 self.into_keyed()
2152 .reduce_commutative_idempotent(comb)
2153 .entries()
2154 }
2155}
2156
2157impl<'a, K, V, L, O: Ordering> Stream<(K, V), Tick<L>, Bounded, O, ExactlyOnce>
2158where
2159 K: Eq + Hash,
2160 L: Location<'a>,
2161{
2162 #[deprecated = "use .into_keyed().fold_commutative(...) instead"]
2163 /// A special case of [`Stream::fold_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2164 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2165 /// in the second element are accumulated via the `comb` closure.
2166 ///
2167 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2168 ///
2169 /// If the input and output value types are the same and do not require initialization then use
2170 /// [`Stream::reduce_keyed_commutative`].
2171 ///
2172 /// # Example
2173 /// ```rust
2174 /// # use hydro_lang::prelude::*;
2175 /// # use futures::StreamExt;
2176 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2177 /// let tick = process.tick();
2178 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2179 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2180 /// batch
2181 /// .fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x))
2182 /// .all_ticks()
2183 /// # }, |mut stream| async move {
2184 /// // (1, 5), (2, 7)
2185 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2186 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2187 /// # }));
2188 /// ```
2189 pub fn fold_keyed_commutative<A, I, F>(
2190 self,
2191 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2192 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2193 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2194 where
2195 I: Fn() -> A + 'a,
2196 F: Fn(&mut A, V) + 'a,
2197 {
2198 self.into_keyed().fold_commutative(init, comb).entries()
2199 }
2200
2201 #[deprecated = "use .into_keyed().reduce_commutative(...) instead"]
2202 /// A special case of [`Stream::reduce_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2203 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2204 /// in the second element are accumulated via the `comb` closure.
2205 ///
2206 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2207 ///
2208 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative`].
2209 ///
2210 /// # Example
2211 /// ```rust
2212 /// # use hydro_lang::prelude::*;
2213 /// # use futures::StreamExt;
2214 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2215 /// let tick = process.tick();
2216 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2217 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2218 /// batch
2219 /// .reduce_keyed_commutative(q!(|acc, x| *acc += x))
2220 /// .all_ticks()
2221 /// # }, |mut stream| async move {
2222 /// // (1, 5), (2, 7)
2223 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2224 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2225 /// # }));
2226 /// ```
2227 pub fn reduce_keyed_commutative<F>(
2228 self,
2229 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2230 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2231 where
2232 F: Fn(&mut V, V) + 'a,
2233 {
2234 self.into_keyed().reduce_commutative(comb).entries()
2235 }
2236}
2237
2238impl<'a, K, V, L, R: Retries> Stream<(K, V), Tick<L>, Bounded, TotalOrder, R>
2239where
2240 K: Eq + Hash,
2241 L: Location<'a>,
2242{
2243 #[deprecated = "use .into_keyed().fold_idempotent(...) instead"]
2244 /// A special case of [`Stream::fold_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2245 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2246 /// in the second element are accumulated via the `comb` closure.
2247 ///
2248 /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
2249 ///
2250 /// If the input and output value types are the same and do not require initialization then use
2251 /// [`Stream::reduce_keyed_idempotent`].
2252 ///
2253 /// # Example
2254 /// ```rust
2255 /// # use hydro_lang::prelude::*;
2256 /// # use futures::StreamExt;
2257 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2258 /// let tick = process.tick();
2259 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2260 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2261 /// batch
2262 /// .fold_keyed_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2263 /// .all_ticks()
2264 /// # }, |mut stream| async move {
2265 /// // (1, false), (2, true)
2266 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2267 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2268 /// # }));
2269 /// ```
2270 pub fn fold_keyed_idempotent<A, I, F>(
2271 self,
2272 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2273 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2274 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2275 where
2276 I: Fn() -> A + 'a,
2277 F: Fn(&mut A, V) + 'a,
2278 {
2279 self.into_keyed().fold_idempotent(init, comb).entries()
2280 }
2281
2282 #[deprecated = "use .into_keyed().reduce_idempotent(...) instead"]
2283 /// A special case of [`Stream::reduce_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2284 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2285 /// in the second element are accumulated via the `comb` closure.
2286 ///
2287 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
2288 ///
2289 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_idempotent`].
2290 ///
2291 /// # Example
2292 /// ```rust
2293 /// # use hydro_lang::prelude::*;
2294 /// # use futures::StreamExt;
2295 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2296 /// let tick = process.tick();
2297 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2298 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2299 /// batch
2300 /// .reduce_keyed_idempotent(q!(|acc, x| *acc |= x))
2301 /// .all_ticks()
2302 /// # }, |mut stream| async move {
2303 /// // (1, false), (2, true)
2304 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2305 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2306 /// # }));
2307 /// ```
2308 pub fn reduce_keyed_idempotent<F>(
2309 self,
2310 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2311 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2312 where
2313 F: Fn(&mut V, V) + 'a,
2314 {
2315 self.into_keyed().reduce_idempotent(comb).entries()
2316 }
2317}
2318
2319impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2320where
2321 L: Location<'a> + NoTick,
2322{
2323 /// Returns a stream corresponding to the latest batch of elements being atomically
2324 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2325 /// the order of the input.
2326 ///
2327 /// # Non-Determinism
2328 /// The batch boundaries are non-deterministic and may change across executions.
2329 pub fn batch(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2330 Stream::new(
2331 self.location.clone().tick,
2332 HydroNode::Unpersist {
2333 inner: Box::new(self.ir_node.into_inner()),
2334 metadata: self.location.new_node_metadata::<T>(),
2335 },
2336 )
2337 }
2338
2339 /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2340 /// See [`Stream::atomic`] for more details.
2341 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2342 Stream::new(self.location.tick.l, self.ir_node.into_inner())
2343 }
2344
2345 /// Gets the [`Tick`] inside which this stream is synchronously processed. See [`Stream::atomic`].
2346 pub fn atomic_source(&self) -> Tick<L> {
2347 self.location.tick.clone()
2348 }
2349}
2350
2351impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2352where
2353 L: Location<'a> + NoTick + NoAtomic,
2354{
2355 /// Shifts this stream into an atomic context, which guarantees that any downstream logic
2356 /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
2357 ///
2358 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2359 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2360 /// argument that declares where the stream will be atomically processed. Batching a stream into
2361 /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2362 /// [`Tick`] will introduce asynchrony.
2363 pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
2364 Stream::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
2365 }
2366
2367 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2368 /// Future outputs are produced as available, regardless of input arrival order.
2369 ///
2370 /// # Example
2371 /// ```rust
2372 /// # use std::collections::HashSet;
2373 /// # use futures::StreamExt;
2374 /// # use hydro_lang::prelude::*;
2375 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2376 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2377 /// .map(q!(|x| async move {
2378 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2379 /// x
2380 /// }))
2381 /// .resolve_futures()
2382 /// # },
2383 /// # |mut stream| async move {
2384 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2385 /// # let mut output = HashSet::new();
2386 /// # for _ in 1..10 {
2387 /// # output.insert(stream.next().await.unwrap());
2388 /// # }
2389 /// # assert_eq!(
2390 /// # output,
2391 /// # HashSet::<i32>::from_iter(1..10)
2392 /// # );
2393 /// # },
2394 /// # ));
2395 pub fn resolve_futures<T2>(self) -> Stream<T2, L, B, NoOrder, R>
2396 where
2397 T: Future<Output = T2>,
2398 {
2399 Stream::new(
2400 self.location.clone(),
2401 HydroNode::ResolveFutures {
2402 input: Box::new(self.ir_node.into_inner()),
2403 metadata: self.location.new_node_metadata::<T2>(),
2404 },
2405 )
2406 }
2407
2408 /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2409 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2410 /// the order of the input. The output stream will execute in the [`Tick`] that was
2411 /// used to create the atomic section.
2412 ///
2413 /// # Non-Determinism
2414 /// The batch boundaries are non-deterministic and may change across executions.
2415 pub fn batch(self, tick: &Tick<L>, nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2416 self.atomic(tick).batch(nondet)
2417 }
2418
2419 /// Given a time interval, returns a stream corresponding to samples taken from the
2420 /// stream roughly at that interval. The output will have elements in the same order
2421 /// as the input, but with arbitrary elements skipped between samples. There is also
2422 /// no guarantee on the exact timing of the samples.
2423 ///
2424 /// # Non-Determinism
2425 /// The output stream is non-deterministic in which elements are sampled, since this
2426 /// is controlled by a clock.
2427 pub fn sample_every(
2428 self,
2429 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
2430 nondet: NonDet,
2431 ) -> Stream<T, L, Unbounded, O, AtLeastOnce> {
2432 let samples = self.location.source_interval(interval, nondet);
2433
2434 let tick = self.location.tick();
2435 self.batch(&tick, nondet)
2436 .filter_if_some(samples.batch(&tick, nondet).first())
2437 .all_ticks()
2438 .weakest_retries()
2439 }
2440
2441 /// Given a timeout duration, returns an [`Optional`] which will have a value if the
2442 /// stream has not emitted a value since that duration.
2443 ///
2444 /// # Non-Determinism
2445 /// Timeout relies on non-deterministic sampling of the stream, so depending on when
2446 /// samples take place, timeouts may be non-deterministically generated or missed,
2447 /// and the notification of the timeout may be delayed as well. There is also no
2448 /// guarantee on how long the [`Optional`] will have a value after the timeout is
2449 /// detected based on when the next sample is taken.
2450 pub fn timeout(
2451 self,
2452 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
2453 nondet: NonDet,
2454 ) -> Optional<(), L, Unbounded> {
2455 let tick = self.location.tick();
2456
2457 let latest_received = self.assume_retries(nondet).fold_commutative(
2458 q!(|| None),
2459 q!(|latest, _| {
2460 *latest = Some(Instant::now());
2461 }),
2462 );
2463
2464 latest_received
2465 .snapshot(&tick, nondet)
2466 .filter_map(q!(move |latest_received| {
2467 if let Some(latest_received) = latest_received {
2468 if Instant::now().duration_since(latest_received) > duration {
2469 Some(())
2470 } else {
2471 None
2472 }
2473 } else {
2474 Some(())
2475 }
2476 }))
2477 .latest()
2478 }
2479}
2480
2481impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2482where
2483 L: Location<'a> + NoTick + NoAtomic,
2484 F: Future<Output = T>,
2485{
2486 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2487 /// Future outputs are produced in the same order as the input stream.
2488 ///
2489 /// # Example
2490 /// ```rust
2491 /// # use std::collections::HashSet;
2492 /// # use futures::StreamExt;
2493 /// # use hydro_lang::prelude::*;
2494 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2495 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2496 /// .map(q!(|x| async move {
2497 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2498 /// x
2499 /// }))
2500 /// .resolve_futures_ordered()
2501 /// # },
2502 /// # |mut stream| async move {
2503 /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2504 /// # let mut output = Vec::new();
2505 /// # for _ in 1..10 {
2506 /// # output.push(stream.next().await.unwrap());
2507 /// # }
2508 /// # assert_eq!(
2509 /// # output,
2510 /// # vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2511 /// # );
2512 /// # },
2513 /// # ));
2514 pub fn resolve_futures_ordered(self) -> Stream<T, L, B, O, R> {
2515 Stream::new(
2516 self.location.clone(),
2517 HydroNode::ResolveFuturesOrdered {
2518 input: Box::new(self.ir_node.into_inner()),
2519 metadata: self.location.new_node_metadata::<T>(),
2520 },
2521 )
2522 }
2523}
2524
2525impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
2526where
2527 L: Location<'a> + NoTick,
2528{
2529 /// Executes the provided closure for every element in this stream.
2530 ///
2531 /// Because the closure may have side effects, the stream must have deterministic order
2532 /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
2533 /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
2534 /// [`Stream::assume_retries`] with an explanation for why this is the case.
2535 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
2536 let f = f.splice_fn1_ctx(&self.location).into();
2537 let metadata = self.location.new_node_metadata::<T>();
2538 self.location
2539 .flow_state()
2540 .borrow_mut()
2541 .push_root(HydroRoot::ForEach {
2542 input: Box::new(HydroNode::Unpersist {
2543 inner: Box::new(self.ir_node.into_inner()),
2544 metadata: metadata.clone(),
2545 }),
2546 f,
2547 op_metadata: HydroIrOpMetadata::new(),
2548 });
2549 }
2550
2551 /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
2552 /// TCP socket to some other server. You should _not_ use this API for interacting with
2553 /// external clients, instead see [`Location::bidi_external_many_bytes`] and
2554 /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
2555 /// interaction with asynchronous sinks.
2556 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
2557 where
2558 S: 'a + futures::Sink<T> + Unpin,
2559 {
2560 self.location
2561 .flow_state()
2562 .borrow_mut()
2563 .push_root(HydroRoot::DestSink {
2564 sink: sink.splice_typed_ctx(&self.location).into(),
2565 input: Box::new(self.ir_node.into_inner()),
2566 op_metadata: HydroIrOpMetadata::new(),
2567 });
2568 }
2569}
2570
2571impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2572where
2573 L: Location<'a>,
2574{
2575 /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2576 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2577 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2578 Stream::new(
2579 self.location.outer().clone(),
2580 HydroNode::Persist {
2581 inner: Box::new(self.ir_node.into_inner()),
2582 metadata: self.location.new_node_metadata::<T>(),
2583 },
2584 )
2585 }
2586
2587 /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2588 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2589 ///
2590 /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2591 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2592 /// stream's [`Tick`] context.
2593 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2594 Stream::new(
2595 Atomic {
2596 tick: self.location.clone(),
2597 },
2598 HydroNode::Persist {
2599 inner: Box::new(self.ir_node.into_inner()),
2600 metadata: self.location.new_node_metadata::<T>(),
2601 },
2602 )
2603 }
2604
2605 /// Accumulates the elements of this stream **across ticks** by concatenating them together.
2606 ///
2607 /// The output stream in tick T will contain the elements of the input at tick 0, 1, ..., up to
2608 /// and including tick T. This is useful for accumulating streaming inputs across ticks, but be
2609 /// careful when using this operator, as its memory usage will grow linearly over time since it
2610 /// must store its inputs indefinitely.
2611 ///
2612 /// # Example
2613 /// ```rust
2614 /// # use hydro_lang::prelude::*;
2615 /// # use futures::StreamExt;
2616 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2617 /// let tick = process.tick();
2618 /// // ticks are lazy by default, forces the second tick to run
2619 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2620 ///
2621 /// let batch_first_tick = process
2622 /// .source_iter(q!(vec![1, 2, 3, 4]))
2623 /// .batch(&tick, nondet!(/** test */));
2624 /// let batch_second_tick = process
2625 /// .source_iter(q!(vec![5, 6, 7, 8]))
2626 /// .batch(&tick, nondet!(/** test */))
2627 /// .defer_tick(); // appears on the second tick
2628 /// batch_first_tick.chain(batch_second_tick)
2629 /// .persist()
2630 /// .all_ticks()
2631 /// # }, |mut stream| async move {
2632 /// // [1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, ...]
2633 /// # for w in vec![1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8] {
2634 /// # assert_eq!(stream.next().await.unwrap(), w);
2635 /// # }
2636 /// # }));
2637 /// ```
2638 pub fn persist(self) -> Stream<T, Tick<L>, Bounded, O, R>
2639 where
2640 T: Clone,
2641 {
2642 Stream::new(
2643 self.location.clone(),
2644 HydroNode::Persist {
2645 inner: Box::new(self.ir_node.into_inner()),
2646 metadata: self.location.new_node_metadata::<T>(),
2647 },
2648 )
2649 }
2650
2651 #[expect(missing_docs, reason = "TODO")]
2652 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2653 Stream::new(
2654 self.location.clone(),
2655 HydroNode::DeferTick {
2656 input: Box::new(self.ir_node.into_inner()),
2657 metadata: self.location.new_node_metadata::<T>(),
2658 },
2659 )
2660 }
2661}
2662
2663#[cfg(test)]
2664mod tests {
2665 use futures::{SinkExt, StreamExt};
2666 use hydro_deploy::Deployment;
2667 use serde::{Deserialize, Serialize};
2668 use stageleft::q;
2669
2670 use crate::compile::builder::FlowBuilder;
2671 use crate::location::Location;
2672
2673 mod backtrace_chained_ops;
2674
2675 struct P1 {}
2676 struct P2 {}
2677
2678 #[derive(Serialize, Deserialize, Debug)]
2679 struct SendOverNetwork {
2680 n: u32,
2681 }
2682
2683 #[tokio::test]
2684 async fn first_ten_distributed() {
2685 let mut deployment = Deployment::new();
2686
2687 let flow = FlowBuilder::new();
2688 let first_node = flow.process::<P1>();
2689 let second_node = flow.process::<P2>();
2690 let external = flow.external::<P2>();
2691
2692 let numbers = first_node.source_iter(q!(0..10));
2693 let out_port = numbers
2694 .map(q!(|n| SendOverNetwork { n }))
2695 .send_bincode(&second_node)
2696 .send_bincode_external(&external);
2697
2698 let nodes = flow
2699 .with_process(&first_node, deployment.Localhost())
2700 .with_process(&second_node, deployment.Localhost())
2701 .with_external(&external, deployment.Localhost())
2702 .deploy(&mut deployment);
2703
2704 deployment.deploy().await.unwrap();
2705
2706 let mut external_out = nodes.connect_source_bincode(out_port).await;
2707
2708 deployment.start().await.unwrap();
2709
2710 for i in 0..10 {
2711 assert_eq!(external_out.next().await.unwrap().n, i);
2712 }
2713 }
2714
2715 #[tokio::test]
2716 async fn first_cardinality() {
2717 let mut deployment = Deployment::new();
2718
2719 let flow = FlowBuilder::new();
2720 let node = flow.process::<()>();
2721 let external = flow.external::<()>();
2722
2723 let node_tick = node.tick();
2724 let count = node_tick
2725 .singleton(q!([1, 2, 3]))
2726 .into_stream()
2727 .flatten_ordered()
2728 .first()
2729 .into_stream()
2730 .count()
2731 .all_ticks()
2732 .send_bincode_external(&external);
2733
2734 let nodes = flow
2735 .with_process(&node, deployment.Localhost())
2736 .with_external(&external, deployment.Localhost())
2737 .deploy(&mut deployment);
2738
2739 deployment.deploy().await.unwrap();
2740
2741 let mut external_out = nodes.connect_source_bincode(count).await;
2742
2743 deployment.start().await.unwrap();
2744
2745 assert_eq!(external_out.next().await.unwrap(), 1);
2746 }
2747
2748 #[tokio::test]
2749 async fn unbounded_scan_remembers_state() {
2750 let mut deployment = Deployment::new();
2751
2752 let flow = FlowBuilder::new();
2753 let node = flow.process::<()>();
2754 let external = flow.external::<()>();
2755
2756 let (input_port, input) = node.source_external_bincode(&external);
2757 let out = input
2758 .scan(
2759 q!(|| 0),
2760 q!(|acc, v| {
2761 *acc += v;
2762 Some(*acc)
2763 }),
2764 )
2765 .send_bincode_external(&external);
2766
2767 let nodes = flow
2768 .with_process(&node, deployment.Localhost())
2769 .with_external(&external, deployment.Localhost())
2770 .deploy(&mut deployment);
2771
2772 deployment.deploy().await.unwrap();
2773
2774 let mut external_in = nodes.connect_sink_bincode(input_port).await;
2775 let mut external_out = nodes.connect_source_bincode(out).await;
2776
2777 deployment.start().await.unwrap();
2778
2779 external_in.send(1).await.unwrap();
2780 assert_eq!(external_out.next().await.unwrap(), 1);
2781
2782 external_in.send(2).await.unwrap();
2783 assert_eq!(external_out.next().await.unwrap(), 3);
2784 }
2785}