hydro_lang/live_collections/stream/mod.rs
1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::KeyedStream;
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::ir::{
19 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
20};
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::forward_handle::{ForwardRef, TickCycle};
24#[cfg(stageleft_runtime)]
25use crate::location::dynamic::{DynLocation, LocationId};
26use crate::location::tick::{Atomic, DeferTick, NoAtomic};
27use crate::location::{Location, NoTick, Tick, check_matching_location};
28use crate::nondet::{NonDet, nondet};
29
30pub mod networking;
31
32/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
33#[sealed::sealed]
34pub trait Ordering:
35 MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
36{
37 /// The [`StreamOrder`] corresponding to this type.
38 const ORDERING_KIND: StreamOrder;
39}
40
41/// Marks the stream as being totally ordered, which means that there are
42/// no sources of non-determinism (other than intentional ones) that will
43/// affect the order of elements.
44pub enum TotalOrder {}
45
46#[sealed::sealed]
47impl Ordering for TotalOrder {
48 const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
49}
50
51/// Marks the stream as having no order, which means that the order of
52/// elements may be affected by non-determinism.
53///
54/// This restricts certain operators, such as `fold` and `reduce`, to only
55/// be used with commutative aggregation functions.
56pub enum NoOrder {}
57
58#[sealed::sealed]
59impl Ordering for NoOrder {
60 const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
61}
62
63/// Helper trait for determining the weakest of two orderings.
64#[sealed::sealed]
65pub trait MinOrder<Other: ?Sized> {
66 /// The weaker of the two orderings.
67 type Min: Ordering;
68}
69
70#[sealed::sealed]
71impl MinOrder<NoOrder> for TotalOrder {
72 type Min = NoOrder;
73}
74
75#[sealed::sealed]
76impl MinOrder<TotalOrder> for TotalOrder {
77 type Min = TotalOrder;
78}
79
80#[sealed::sealed]
81impl MinOrder<TotalOrder> for NoOrder {
82 type Min = NoOrder;
83}
84
85#[sealed::sealed]
86impl MinOrder<NoOrder> for NoOrder {
87 type Min = NoOrder;
88}
89
90/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
91#[sealed::sealed]
92pub trait Retries:
93 MinRetries<Self, Min = Self>
94 + MinRetries<ExactlyOnce, Min = Self>
95 + MinRetries<AtLeastOnce, Min = AtLeastOnce>
96{
97 /// The [`StreamRetry`] corresponding to this type.
98 const RETRIES_KIND: StreamRetry;
99}
100
101/// Marks the stream as having deterministic message cardinality, with no
102/// possibility of duplicates.
103pub enum ExactlyOnce {}
104
105#[sealed::sealed]
106impl Retries for ExactlyOnce {
107 const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
108}
109
110/// Marks the stream as having non-deterministic message cardinality, which
111/// means that duplicates may occur, but messages will not be dropped.
112pub enum AtLeastOnce {}
113
114#[sealed::sealed]
115impl Retries for AtLeastOnce {
116 const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
117}
118
119/// Helper trait for determining the weakest of two retry guarantees.
120#[sealed::sealed]
121pub trait MinRetries<Other: ?Sized> {
122 /// The weaker of the two retry guarantees.
123 type Min: Retries;
124}
125
126#[sealed::sealed]
127impl MinRetries<AtLeastOnce> for ExactlyOnce {
128 type Min = AtLeastOnce;
129}
130
131#[sealed::sealed]
132impl MinRetries<ExactlyOnce> for ExactlyOnce {
133 type Min = ExactlyOnce;
134}
135
136#[sealed::sealed]
137impl MinRetries<ExactlyOnce> for AtLeastOnce {
138 type Min = AtLeastOnce;
139}
140
141#[sealed::sealed]
142impl MinRetries<AtLeastOnce> for AtLeastOnce {
143 type Min = AtLeastOnce;
144}
145
146/// Streaming sequence of elements with type `Type`.
147///
148/// This live collection represents a growing sequence of elements, with new elements being
149/// asynchronously appended to the end of the sequence. This can be used to model the arrival
150/// of network input, such as API requests, or streaming ingestion.
151///
152/// By default, all streams have deterministic ordering and each element is materialized exactly
153/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
154/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
155/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
156///
157/// Type Parameters:
158/// - `Type`: the type of elements in the stream
159/// - `Loc`: the location where the stream is being materialized
160/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
161/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
162/// (default is [`TotalOrder`])
163/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
164/// [`AtLeastOnce`] (default is [`ExactlyOnce`])
165pub struct Stream<
166 Type,
167 Loc,
168 Bound: Boundedness = Unbounded,
169 Order: Ordering = TotalOrder,
170 Retry: Retries = ExactlyOnce,
171> {
172 pub(crate) location: Loc,
173 pub(crate) ir_node: RefCell<HydroNode>,
174
175 _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
176}
177
178impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
179 for Stream<T, L, Unbounded, O, R>
180where
181 L: Location<'a>,
182{
183 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
184 Stream {
185 location: stream.location,
186 ir_node: stream.ir_node,
187 _phantom: PhantomData,
188 }
189 }
190}
191
192impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
193 for Stream<T, L, B, NoOrder, R>
194where
195 L: Location<'a>,
196{
197 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
198 Stream {
199 location: stream.location,
200 ir_node: stream.ir_node,
201 _phantom: PhantomData,
202 }
203 }
204}
205
206impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
207 for Stream<T, L, B, O, AtLeastOnce>
208where
209 L: Location<'a>,
210{
211 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
212 Stream {
213 location: stream.location,
214 ir_node: stream.ir_node,
215 _phantom: PhantomData,
216 }
217 }
218}
219
220impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
221where
222 L: Location<'a>,
223{
224 fn defer_tick(self) -> Self {
225 Stream::defer_tick(self)
226 }
227}
228
229impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
230 for Stream<T, Tick<L>, Bounded, O, R>
231where
232 L: Location<'a>,
233{
234 type Location = Tick<L>;
235
236 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
237 Stream::new(
238 location.clone(),
239 HydroNode::CycleSource {
240 ident,
241 metadata: location.new_node_metadata(Self::collection_kind()),
242 },
243 )
244 }
245}
246
247impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
248 for Stream<T, Tick<L>, Bounded, O, R>
249where
250 L: Location<'a>,
251{
252 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
253 assert_eq!(
254 Location::id(&self.location),
255 expected_location,
256 "locations do not match"
257 );
258 self.location
259 .flow_state()
260 .borrow_mut()
261 .push_root(HydroRoot::CycleSink {
262 ident,
263 input: Box::new(self.ir_node.into_inner()),
264 op_metadata: HydroIrOpMetadata::new(),
265 });
266 }
267}
268
269impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
270 for Stream<T, L, B, O, R>
271where
272 L: Location<'a> + NoTick,
273{
274 type Location = L;
275
276 fn create_source(ident: syn::Ident, location: L) -> Self {
277 Stream::new(
278 location.clone(),
279 HydroNode::CycleSource {
280 ident,
281 metadata: location.new_node_metadata(Self::collection_kind()),
282 },
283 )
284 }
285}
286
287impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
288 for Stream<T, L, B, O, R>
289where
290 L: Location<'a> + NoTick,
291{
292 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
293 assert_eq!(
294 Location::id(&self.location),
295 expected_location,
296 "locations do not match"
297 );
298 self.location
299 .flow_state()
300 .borrow_mut()
301 .push_root(HydroRoot::CycleSink {
302 ident,
303 input: Box::new(self.ir_node.into_inner()),
304 op_metadata: HydroIrOpMetadata::new(),
305 });
306 }
307}
308
309impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
310where
311 T: Clone,
312 L: Location<'a>,
313{
314 fn clone(&self) -> Self {
315 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
316 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
317 *self.ir_node.borrow_mut() = HydroNode::Tee {
318 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
319 metadata: self.location.new_node_metadata(Self::collection_kind()),
320 };
321 }
322
323 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
324 Stream {
325 location: self.location.clone(),
326 ir_node: HydroNode::Tee {
327 inner: TeeNode(inner.0.clone()),
328 metadata: metadata.clone(),
329 }
330 .into(),
331 _phantom: PhantomData,
332 }
333 } else {
334 unreachable!()
335 }
336 }
337}
338
339impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
340where
341 L: Location<'a>,
342{
343 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
344 debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
345 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
346
347 Stream {
348 location,
349 ir_node: RefCell::new(ir_node),
350 _phantom: PhantomData,
351 }
352 }
353
354 /// Returns the [`Location`] where this stream is being materialized.
355 pub fn location(&self) -> &L {
356 &self.location
357 }
358
359 pub(crate) fn collection_kind() -> CollectionKind {
360 CollectionKind::Stream {
361 bound: B::BOUND_KIND,
362 order: O::ORDERING_KIND,
363 retry: R::RETRIES_KIND,
364 element_type: stageleft::quote_type::<T>().into(),
365 }
366 }
367
368 /// Produces a stream based on invoking `f` on each element.
369 /// If you do not want to modify the stream and instead only want to view
370 /// each item use [`Stream::inspect`] instead.
371 ///
372 /// # Example
373 /// ```rust
374 /// # #[cfg(feature = "deploy")] {
375 /// # use hydro_lang::prelude::*;
376 /// # use futures::StreamExt;
377 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
378 /// let words = process.source_iter(q!(vec!["hello", "world"]));
379 /// words.map(q!(|x| x.to_uppercase()))
380 /// # }, |mut stream| async move {
381 /// # for w in vec!["HELLO", "WORLD"] {
382 /// # assert_eq!(stream.next().await.unwrap(), w);
383 /// # }
384 /// # }));
385 /// # }
386 /// ```
387 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
388 where
389 F: Fn(T) -> U + 'a,
390 {
391 let f = f.splice_fn1_ctx(&self.location).into();
392 Stream::new(
393 self.location.clone(),
394 HydroNode::Map {
395 f,
396 input: Box::new(self.ir_node.into_inner()),
397 metadata: self
398 .location
399 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
400 },
401 )
402 }
403
404 /// For each item `i` in the input stream, transform `i` using `f` and then treat the
405 /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
406 /// for the output type `U` must produce items in a **deterministic** order.
407 ///
408 /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
409 /// not deterministic, use [`Stream::flat_map_unordered`] instead.
410 ///
411 /// # Example
412 /// ```rust
413 /// # #[cfg(feature = "deploy")] {
414 /// # use hydro_lang::prelude::*;
415 /// # use futures::StreamExt;
416 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
417 /// process
418 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
419 /// .flat_map_ordered(q!(|x| x))
420 /// # }, |mut stream| async move {
421 /// // 1, 2, 3, 4
422 /// # for w in (1..5) {
423 /// # assert_eq!(stream.next().await.unwrap(), w);
424 /// # }
425 /// # }));
426 /// # }
427 /// ```
428 pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
429 where
430 I: IntoIterator<Item = U>,
431 F: Fn(T) -> I + 'a,
432 {
433 let f = f.splice_fn1_ctx(&self.location).into();
434 Stream::new(
435 self.location.clone(),
436 HydroNode::FlatMap {
437 f,
438 input: Box::new(self.ir_node.into_inner()),
439 metadata: self
440 .location
441 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
442 },
443 )
444 }
445
446 /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
447 /// for the output type `U` to produce items in any order.
448 ///
449 /// # Example
450 /// ```rust
451 /// # #[cfg(feature = "deploy")] {
452 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
453 /// # use futures::StreamExt;
454 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
455 /// process
456 /// .source_iter(q!(vec![
457 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
458 /// std::collections::HashSet::from_iter(vec![3, 4]),
459 /// ]))
460 /// .flat_map_unordered(q!(|x| x))
461 /// # }, |mut stream| async move {
462 /// // 1, 2, 3, 4, but in no particular order
463 /// # let mut results = Vec::new();
464 /// # for w in (1..5) {
465 /// # results.push(stream.next().await.unwrap());
466 /// # }
467 /// # results.sort();
468 /// # assert_eq!(results, vec![1, 2, 3, 4]);
469 /// # }));
470 /// # }
471 /// ```
472 pub fn flat_map_unordered<U, I, F>(
473 self,
474 f: impl IntoQuotedMut<'a, F, L>,
475 ) -> Stream<U, L, B, NoOrder, R>
476 where
477 I: IntoIterator<Item = U>,
478 F: Fn(T) -> I + 'a,
479 {
480 let f = f.splice_fn1_ctx(&self.location).into();
481 Stream::new(
482 self.location.clone(),
483 HydroNode::FlatMap {
484 f,
485 input: Box::new(self.ir_node.into_inner()),
486 metadata: self
487 .location
488 .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
489 },
490 )
491 }
492
493 /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
494 /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
495 ///
496 /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
497 /// not deterministic, use [`Stream::flatten_unordered`] instead.
498 ///
499 /// ```rust
500 /// # #[cfg(feature = "deploy")] {
501 /// # use hydro_lang::prelude::*;
502 /// # use futures::StreamExt;
503 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
504 /// process
505 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
506 /// .flatten_ordered()
507 /// # }, |mut stream| async move {
508 /// // 1, 2, 3, 4
509 /// # for w in (1..5) {
510 /// # assert_eq!(stream.next().await.unwrap(), w);
511 /// # }
512 /// # }));
513 /// # }
514 /// ```
515 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
516 where
517 T: IntoIterator<Item = U>,
518 {
519 self.flat_map_ordered(q!(|d| d))
520 }
521
522 /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
523 /// for the element type `T` to produce items in any order.
524 ///
525 /// # Example
526 /// ```rust
527 /// # #[cfg(feature = "deploy")] {
528 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
529 /// # use futures::StreamExt;
530 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
531 /// process
532 /// .source_iter(q!(vec![
533 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
534 /// std::collections::HashSet::from_iter(vec![3, 4]),
535 /// ]))
536 /// .flatten_unordered()
537 /// # }, |mut stream| async move {
538 /// // 1, 2, 3, 4, but in no particular order
539 /// # let mut results = Vec::new();
540 /// # for w in (1..5) {
541 /// # results.push(stream.next().await.unwrap());
542 /// # }
543 /// # results.sort();
544 /// # assert_eq!(results, vec![1, 2, 3, 4]);
545 /// # }));
546 /// # }
547 /// ```
548 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
549 where
550 T: IntoIterator<Item = U>,
551 {
552 self.flat_map_unordered(q!(|d| d))
553 }
554
555 /// Creates a stream containing only the elements of the input stream that satisfy a predicate
556 /// `f`, preserving the order of the elements.
557 ///
558 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
559 /// not modify or take ownership of the values. If you need to modify the values while filtering
560 /// use [`Stream::filter_map`] instead.
561 ///
562 /// # Example
563 /// ```rust
564 /// # #[cfg(feature = "deploy")] {
565 /// # use hydro_lang::prelude::*;
566 /// # use futures::StreamExt;
567 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
568 /// process
569 /// .source_iter(q!(vec![1, 2, 3, 4]))
570 /// .filter(q!(|&x| x > 2))
571 /// # }, |mut stream| async move {
572 /// // 3, 4
573 /// # for w in (3..5) {
574 /// # assert_eq!(stream.next().await.unwrap(), w);
575 /// # }
576 /// # }));
577 /// # }
578 /// ```
579 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
580 where
581 F: Fn(&T) -> bool + 'a,
582 {
583 let f = f.splice_fn1_borrow_ctx(&self.location).into();
584 Stream::new(
585 self.location.clone(),
586 HydroNode::Filter {
587 f,
588 input: Box::new(self.ir_node.into_inner()),
589 metadata: self.location.new_node_metadata(Self::collection_kind()),
590 },
591 )
592 }
593
594 /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
595 ///
596 /// # Example
597 /// ```rust
598 /// # #[cfg(feature = "deploy")] {
599 /// # use hydro_lang::prelude::*;
600 /// # use futures::StreamExt;
601 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
602 /// process
603 /// .source_iter(q!(vec!["1", "hello", "world", "2"]))
604 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
605 /// # }, |mut stream| async move {
606 /// // 1, 2
607 /// # for w in (1..3) {
608 /// # assert_eq!(stream.next().await.unwrap(), w);
609 /// # }
610 /// # }));
611 /// # }
612 /// ```
613 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
614 where
615 F: Fn(T) -> Option<U> + 'a,
616 {
617 let f = f.splice_fn1_ctx(&self.location).into();
618 Stream::new(
619 self.location.clone(),
620 HydroNode::FilterMap {
621 f,
622 input: Box::new(self.ir_node.into_inner()),
623 metadata: self
624 .location
625 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
626 },
627 )
628 }
629
630 /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
631 /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
632 /// If `other` is an empty [`Optional`], no values will be produced.
633 ///
634 /// # Example
635 /// ```rust
636 /// # #[cfg(feature = "deploy")] {
637 /// # use hydro_lang::prelude::*;
638 /// # use futures::StreamExt;
639 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
640 /// let tick = process.tick();
641 /// let batch = process
642 /// .source_iter(q!(vec![1, 2, 3, 4]))
643 /// .batch(&tick, nondet!(/** test */));
644 /// let count = batch.clone().count(); // `count()` returns a singleton
645 /// batch.cross_singleton(count).all_ticks()
646 /// # }, |mut stream| async move {
647 /// // (1, 4), (2, 4), (3, 4), (4, 4)
648 /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
649 /// # assert_eq!(stream.next().await.unwrap(), w);
650 /// # }
651 /// # }));
652 /// # }
653 /// ```
654 pub fn cross_singleton<O2>(
655 self,
656 other: impl Into<Optional<O2, L, Bounded>>,
657 ) -> Stream<(T, O2), L, B, O, R>
658 where
659 O2: Clone,
660 {
661 let other: Optional<O2, L, Bounded> = other.into();
662 check_matching_location(&self.location, &other.location);
663
664 Stream::new(
665 self.location.clone(),
666 HydroNode::CrossSingleton {
667 left: Box::new(self.ir_node.into_inner()),
668 right: Box::new(other.ir_node.into_inner()),
669 metadata: self
670 .location
671 .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
672 },
673 )
674 }
675
676 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
677 ///
678 /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
679 /// leader of a cluster.
680 ///
681 /// # Example
682 /// ```rust
683 /// # #[cfg(feature = "deploy")] {
684 /// # use hydro_lang::prelude::*;
685 /// # use futures::StreamExt;
686 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
687 /// let tick = process.tick();
688 /// // ticks are lazy by default, forces the second tick to run
689 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
690 ///
691 /// let batch_first_tick = process
692 /// .source_iter(q!(vec![1, 2, 3, 4]))
693 /// .batch(&tick, nondet!(/** test */));
694 /// let batch_second_tick = process
695 /// .source_iter(q!(vec![5, 6, 7, 8]))
696 /// .batch(&tick, nondet!(/** test */))
697 /// .defer_tick(); // appears on the second tick
698 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
699 /// batch_first_tick.chain(batch_second_tick)
700 /// .filter_if_some(some_on_first_tick)
701 /// .all_ticks()
702 /// # }, |mut stream| async move {
703 /// // [1, 2, 3, 4]
704 /// # for w in vec![1, 2, 3, 4] {
705 /// # assert_eq!(stream.next().await.unwrap(), w);
706 /// # }
707 /// # }));
708 /// # }
709 /// ```
710 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
711 self.cross_singleton(signal.map(q!(|_u| ())))
712 .map(q!(|(d, _signal)| d))
713 }
714
715 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
716 ///
717 /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
718 /// some local state.
719 ///
720 /// # Example
721 /// ```rust
722 /// # #[cfg(feature = "deploy")] {
723 /// # use hydro_lang::prelude::*;
724 /// # use futures::StreamExt;
725 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
726 /// let tick = process.tick();
727 /// // ticks are lazy by default, forces the second tick to run
728 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
729 ///
730 /// let batch_first_tick = process
731 /// .source_iter(q!(vec![1, 2, 3, 4]))
732 /// .batch(&tick, nondet!(/** test */));
733 /// let batch_second_tick = process
734 /// .source_iter(q!(vec![5, 6, 7, 8]))
735 /// .batch(&tick, nondet!(/** test */))
736 /// .defer_tick(); // appears on the second tick
737 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
738 /// batch_first_tick.chain(batch_second_tick)
739 /// .filter_if_none(some_on_first_tick)
740 /// .all_ticks()
741 /// # }, |mut stream| async move {
742 /// // [5, 6, 7, 8]
743 /// # for w in vec![5, 6, 7, 8] {
744 /// # assert_eq!(stream.next().await.unwrap(), w);
745 /// # }
746 /// # }));
747 /// # }
748 /// ```
749 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
750 self.filter_if_some(
751 other
752 .map(q!(|_| ()))
753 .into_singleton()
754 .filter(q!(|o| o.is_none())),
755 )
756 }
757
758 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
759 /// tupled pairs in a non-deterministic order.
760 ///
761 /// # Example
762 /// ```rust
763 /// # #[cfg(feature = "deploy")] {
764 /// # use hydro_lang::prelude::*;
765 /// # use std::collections::HashSet;
766 /// # use futures::StreamExt;
767 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
768 /// let tick = process.tick();
769 /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
770 /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
771 /// stream1.cross_product(stream2)
772 /// # }, |mut stream| async move {
773 /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
774 /// # stream.map(|i| assert!(expected.contains(&i)));
775 /// # }));
776 /// # }
777 /// ```
778 pub fn cross_product<T2, O2: Ordering>(
779 self,
780 other: Stream<T2, L, B, O2, R>,
781 ) -> Stream<(T, T2), L, B, NoOrder, R>
782 where
783 T: Clone,
784 T2: Clone,
785 {
786 check_matching_location(&self.location, &other.location);
787
788 Stream::new(
789 self.location.clone(),
790 HydroNode::CrossProduct {
791 left: Box::new(self.ir_node.into_inner()),
792 right: Box::new(other.ir_node.into_inner()),
793 metadata: self
794 .location
795 .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
796 },
797 )
798 }
799
800 /// Takes one stream as input and filters out any duplicate occurrences. The output
801 /// contains all unique values from the input.
802 ///
803 /// # Example
804 /// ```rust
805 /// # #[cfg(feature = "deploy")] {
806 /// # use hydro_lang::prelude::*;
807 /// # use futures::StreamExt;
808 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
809 /// let tick = process.tick();
810 /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
811 /// # }, |mut stream| async move {
812 /// # for w in vec![1, 2, 3, 4] {
813 /// # assert_eq!(stream.next().await.unwrap(), w);
814 /// # }
815 /// # }));
816 /// # }
817 /// ```
818 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
819 where
820 T: Eq + Hash,
821 {
822 Stream::new(
823 self.location.clone(),
824 HydroNode::Unique {
825 input: Box::new(self.ir_node.into_inner()),
826 metadata: self
827 .location
828 .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
829 },
830 )
831 }
832
833 /// Outputs everything in this stream that is *not* contained in the `other` stream.
834 ///
835 /// The `other` stream must be [`Bounded`], since this function will wait until
836 /// all its elements are available before producing any output.
837 /// # Example
838 /// ```rust
839 /// # #[cfg(feature = "deploy")] {
840 /// # use hydro_lang::prelude::*;
841 /// # use futures::StreamExt;
842 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
843 /// let tick = process.tick();
844 /// let stream = process
845 /// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
846 /// .batch(&tick, nondet!(/** test */));
847 /// let batch = process
848 /// .source_iter(q!(vec![1, 2]))
849 /// .batch(&tick, nondet!(/** test */));
850 /// stream.filter_not_in(batch).all_ticks()
851 /// # }, |mut stream| async move {
852 /// # for w in vec![3, 4] {
853 /// # assert_eq!(stream.next().await.unwrap(), w);
854 /// # }
855 /// # }));
856 /// # }
857 /// ```
858 pub fn filter_not_in<O2: Ordering>(self, other: Stream<T, L, Bounded, O2, R>) -> Self
859 where
860 T: Eq + Hash,
861 {
862 check_matching_location(&self.location, &other.location);
863
864 Stream::new(
865 self.location.clone(),
866 HydroNode::Difference {
867 pos: Box::new(self.ir_node.into_inner()),
868 neg: Box::new(other.ir_node.into_inner()),
869 metadata: self
870 .location
871 .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
872 },
873 )
874 }
875
876 /// An operator which allows you to "inspect" each element of a stream without
877 /// modifying it. The closure `f` is called on a reference to each item. This is
878 /// mainly useful for debugging, and should not be used to generate side-effects.
879 ///
880 /// # Example
881 /// ```rust
882 /// # #[cfg(feature = "deploy")] {
883 /// # use hydro_lang::prelude::*;
884 /// # use futures::StreamExt;
885 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
886 /// let nums = process.source_iter(q!(vec![1, 2]));
887 /// // prints "1 * 10 = 10" and "2 * 10 = 20"
888 /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
889 /// # }, |mut stream| async move {
890 /// # for w in vec![1, 2] {
891 /// # assert_eq!(stream.next().await.unwrap(), w);
892 /// # }
893 /// # }));
894 /// # }
895 /// ```
896 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
897 where
898 F: Fn(&T) + 'a,
899 {
900 let f = f.splice_fn1_borrow_ctx(&self.location).into();
901
902 Stream::new(
903 self.location.clone(),
904 HydroNode::Inspect {
905 f,
906 input: Box::new(self.ir_node.into_inner()),
907 metadata: self.location.new_node_metadata(Self::collection_kind()),
908 },
909 )
910 }
911
912 /// An operator which allows you to "name" a `HydroNode`.
913 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
914 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
915 {
916 let mut node = self.ir_node.borrow_mut();
917 let metadata = node.metadata_mut();
918 metadata.tag = Some(name.to_string());
919 }
920 self
921 }
922
923 /// Explicitly "casts" the stream to a type with a different ordering
924 /// guarantee. Useful in unsafe code where the ordering cannot be proven
925 /// by the type-system.
926 ///
927 /// # Non-Determinism
928 /// This function is used as an escape hatch, and any mistakes in the
929 /// provided ordering guarantee will propagate into the guarantees
930 /// for the rest of the program.
931 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
932 if O::ORDERING_KIND == O2::ORDERING_KIND {
933 Stream::new(self.location, self.ir_node.into_inner())
934 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
935 // We can always weaken the ordering guarantee
936 Stream::new(
937 self.location.clone(),
938 HydroNode::Cast {
939 inner: Box::new(self.ir_node.into_inner()),
940 metadata: self
941 .location
942 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
943 },
944 )
945 } else {
946 Stream::new(
947 self.location.clone(),
948 HydroNode::ObserveNonDet {
949 inner: Box::new(self.ir_node.into_inner()),
950 trusted: false,
951 metadata: self
952 .location
953 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
954 },
955 )
956 }
957 }
958
959 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
960 // is not observable
961 fn assume_ordering_trusted<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
962 if O::ORDERING_KIND == O2::ORDERING_KIND {
963 Stream::new(self.location, self.ir_node.into_inner())
964 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
965 // We can always weaken the ordering guarantee
966 Stream::new(
967 self.location.clone(),
968 HydroNode::Cast {
969 inner: Box::new(self.ir_node.into_inner()),
970 metadata: self
971 .location
972 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
973 },
974 )
975 } else {
976 Stream::new(
977 self.location.clone(),
978 HydroNode::ObserveNonDet {
979 inner: Box::new(self.ir_node.into_inner()),
980 trusted: true,
981 metadata: self
982 .location
983 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
984 },
985 )
986 }
987 }
988
989 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
990 /// which is always safe because that is the weakest possible guarantee.
991 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
992 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
993 self.assume_ordering::<NoOrder>(nondet)
994 }
995
996 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
997 /// enforcing that `O2` is weaker than the input ordering guarantee.
998 pub fn weaken_ordering<O2: Ordering + MinOrder<O, Min = O2>>(self) -> Stream<T, L, B, O2, R> {
999 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
1000 self.assume_ordering::<O2>(nondet)
1001 }
1002
1003 /// Explicitly "casts" the stream to a type with a different retries
1004 /// guarantee. Useful in unsafe code where the lack of retries cannot
1005 /// be proven by the type-system.
1006 ///
1007 /// # Non-Determinism
1008 /// This function is used as an escape hatch, and any mistakes in the
1009 /// provided retries guarantee will propagate into the guarantees
1010 /// for the rest of the program.
1011 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1012 if R::RETRIES_KIND == R2::RETRIES_KIND {
1013 Stream::new(self.location, self.ir_node.into_inner())
1014 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1015 // We can always weaken the retries guarantee
1016 Stream::new(
1017 self.location.clone(),
1018 HydroNode::Cast {
1019 inner: Box::new(self.ir_node.into_inner()),
1020 metadata: self
1021 .location
1022 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1023 },
1024 )
1025 } else {
1026 Stream::new(
1027 self.location.clone(),
1028 HydroNode::ObserveNonDet {
1029 inner: Box::new(self.ir_node.into_inner()),
1030 trusted: false,
1031 metadata: self
1032 .location
1033 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1034 },
1035 )
1036 }
1037 }
1038
1039 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1040 // is not observable
1041 fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1042 if R::RETRIES_KIND == R2::RETRIES_KIND {
1043 Stream::new(self.location, self.ir_node.into_inner())
1044 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1045 // We can always weaken the retries guarantee
1046 Stream::new(
1047 self.location.clone(),
1048 HydroNode::Cast {
1049 inner: Box::new(self.ir_node.into_inner()),
1050 metadata: self
1051 .location
1052 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1053 },
1054 )
1055 } else {
1056 Stream::new(
1057 self.location.clone(),
1058 HydroNode::ObserveNonDet {
1059 inner: Box::new(self.ir_node.into_inner()),
1060 trusted: true,
1061 metadata: self
1062 .location
1063 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1064 },
1065 )
1066 }
1067 }
1068
1069 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
1070 /// which is always safe because that is the weakest possible guarantee.
1071 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
1072 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1073 self.assume_retries::<AtLeastOnce>(nondet)
1074 }
1075
1076 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
1077 /// enforcing that `R2` is weaker than the input retries guarantee.
1078 pub fn weaken_retries<R2: Retries + MinRetries<R, Min = R2>>(self) -> Stream<T, L, B, O, R2> {
1079 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1080 self.assume_retries::<R2>(nondet)
1081 }
1082}
1083
1084impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1085where
1086 L: Location<'a>,
1087{
1088 /// Given a stream with [`ExactlyOnce`] retry guarantees, weakens it to an arbitrary guarantee
1089 /// `R2`, which is safe because all guarantees are equal to or weaker than [`ExactlyOnce`]
1090 pub fn weaker_retries<R2: Retries>(self) -> Stream<T, L, B, O, R2> {
1091 self.assume_retries(
1092 nondet!(/** any retry ordering is the same or weaker than ExactlyOnce */),
1093 )
1094 }
1095}
1096
1097impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
1098where
1099 L: Location<'a>,
1100{
1101 /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
1102 ///
1103 /// # Example
1104 /// ```rust
1105 /// # #[cfg(feature = "deploy")] {
1106 /// # use hydro_lang::prelude::*;
1107 /// # use futures::StreamExt;
1108 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1109 /// process.source_iter(q!(&[1, 2, 3])).cloned()
1110 /// # }, |mut stream| async move {
1111 /// // 1, 2, 3
1112 /// # for w in vec![1, 2, 3] {
1113 /// # assert_eq!(stream.next().await.unwrap(), w);
1114 /// # }
1115 /// # }));
1116 /// # }
1117 /// ```
1118 pub fn cloned(self) -> Stream<T, L, B, O, R>
1119 where
1120 T: Clone,
1121 {
1122 self.map(q!(|d| d.clone()))
1123 }
1124}
1125
1126impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
1127where
1128 L: Location<'a>,
1129{
1130 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1131 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1132 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1133 ///
1134 /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
1135 /// and there may be duplicates.
1136 ///
1137 /// # Example
1138 /// ```rust
1139 /// # #[cfg(feature = "deploy")] {
1140 /// # use hydro_lang::prelude::*;
1141 /// # use futures::StreamExt;
1142 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1143 /// let tick = process.tick();
1144 /// let bools = process.source_iter(q!(vec![false, true, false]));
1145 /// let batch = bools.batch(&tick, nondet!(/** test */));
1146 /// batch
1147 /// .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1148 /// .all_ticks()
1149 /// # }, |mut stream| async move {
1150 /// // true
1151 /// # assert_eq!(stream.next().await.unwrap(), true);
1152 /// # }));
1153 /// # }
1154 /// ```
1155 pub fn fold_commutative_idempotent<A, I, F>(
1156 self,
1157 init: impl IntoQuotedMut<'a, I, L>,
1158 comb: impl IntoQuotedMut<'a, F, L>,
1159 ) -> Singleton<A, L, B>
1160 where
1161 I: Fn() -> A + 'a,
1162 F: Fn(&mut A, T),
1163 {
1164 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1165 self.assume_ordering(nondet)
1166 .assume_retries(nondet)
1167 .fold(init, comb)
1168 }
1169
1170 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1171 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1172 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1173 /// reference, so that it can be modified in place.
1174 ///
1175 /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
1176 /// and there may be duplicates.
1177 ///
1178 /// # Example
1179 /// ```rust
1180 /// # #[cfg(feature = "deploy")] {
1181 /// # use hydro_lang::prelude::*;
1182 /// # use futures::StreamExt;
1183 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1184 /// let tick = process.tick();
1185 /// let bools = process.source_iter(q!(vec![false, true, false]));
1186 /// let batch = bools.batch(&tick, nondet!(/** test */));
1187 /// batch
1188 /// .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1189 /// .all_ticks()
1190 /// # }, |mut stream| async move {
1191 /// // true
1192 /// # assert_eq!(stream.next().await.unwrap(), true);
1193 /// # }));
1194 /// # }
1195 /// ```
1196 pub fn reduce_commutative_idempotent<F>(
1197 self,
1198 comb: impl IntoQuotedMut<'a, F, L>,
1199 ) -> Optional<T, L, B>
1200 where
1201 F: Fn(&mut T, T) + 'a,
1202 {
1203 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1204 self.assume_retries(nondet).reduce_commutative(comb)
1205 }
1206
1207 // only for internal APIs that have been carefully vetted, will eventually be removed once we
1208 // have algebraic verification of these properties
1209 fn reduce_commutative_idempotent_trusted<F>(
1210 self,
1211 comb: impl IntoQuotedMut<'a, F, L>,
1212 ) -> Optional<T, L, B>
1213 where
1214 F: Fn(&mut T, T) + 'a,
1215 {
1216 self.assume_retries_trusted(nondet!(/** because the closure is trusted idempotent, retries don't affect intermediate states */))
1217 .reduce_commutative_trusted(comb)
1218 }
1219
1220 /// Computes the maximum element in the stream as an [`Optional`], which
1221 /// will be empty until the first element in the input arrives.
1222 ///
1223 /// # Example
1224 /// ```rust
1225 /// # #[cfg(feature = "deploy")] {
1226 /// # use hydro_lang::prelude::*;
1227 /// # use futures::StreamExt;
1228 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1229 /// let tick = process.tick();
1230 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1231 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1232 /// batch.max().all_ticks()
1233 /// # }, |mut stream| async move {
1234 /// // 4
1235 /// # assert_eq!(stream.next().await.unwrap(), 4);
1236 /// # }));
1237 /// # }
1238 /// ```
1239 pub fn max(self) -> Optional<T, L, B>
1240 where
1241 T: Ord,
1242 {
1243 self.reduce_commutative_idempotent_trusted(q!(|curr, new| {
1244 if new > *curr {
1245 *curr = new;
1246 }
1247 }))
1248 }
1249
1250 /// Computes the minimum element in the stream as an [`Optional`], which
1251 /// will be empty until the first element in the input arrives.
1252 ///
1253 /// # Example
1254 /// ```rust
1255 /// # #[cfg(feature = "deploy")] {
1256 /// # use hydro_lang::prelude::*;
1257 /// # use futures::StreamExt;
1258 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1259 /// let tick = process.tick();
1260 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1261 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1262 /// batch.min().all_ticks()
1263 /// # }, |mut stream| async move {
1264 /// // 1
1265 /// # assert_eq!(stream.next().await.unwrap(), 1);
1266 /// # }));
1267 /// # }
1268 /// ```
1269 pub fn min(self) -> Optional<T, L, B>
1270 where
1271 T: Ord,
1272 {
1273 self.reduce_commutative_idempotent_trusted(q!(|curr, new| {
1274 if new < *curr {
1275 *curr = new;
1276 }
1277 }))
1278 }
1279}
1280
1281impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1282where
1283 L: Location<'a>,
1284{
1285 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1286 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1287 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1288 ///
1289 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1290 ///
1291 /// # Example
1292 /// ```rust
1293 /// # #[cfg(feature = "deploy")] {
1294 /// # use hydro_lang::prelude::*;
1295 /// # use futures::StreamExt;
1296 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1297 /// let tick = process.tick();
1298 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1299 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1300 /// batch
1301 /// .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1302 /// .all_ticks()
1303 /// # }, |mut stream| async move {
1304 /// // 10
1305 /// # assert_eq!(stream.next().await.unwrap(), 10);
1306 /// # }));
1307 /// # }
1308 /// ```
1309 pub fn fold_commutative<A, I, F>(
1310 self,
1311 init: impl IntoQuotedMut<'a, I, L>,
1312 comb: impl IntoQuotedMut<'a, F, L>,
1313 ) -> Singleton<A, L, B>
1314 where
1315 I: Fn() -> A + 'a,
1316 F: Fn(&mut A, T),
1317 {
1318 let nondet = nondet!(/** the combinator function is commutative */);
1319 self.assume_ordering(nondet).fold(init, comb)
1320 }
1321
1322 /// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream,
1323 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1324 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1325 /// reference, so that it can be modified in place.
1326 ///
1327 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1328 ///
1329 /// # Example
1330 /// ```rust
1331 /// # #[cfg(feature = "deploy")] {
1332 /// # use hydro_lang::prelude::*;
1333 /// # use futures::StreamExt;
1334 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1335 /// let tick = process.tick();
1336 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1337 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1338 /// batch
1339 /// .reduce_commutative(q!(|curr, new| *curr += new))
1340 /// .all_ticks()
1341 /// # }, |mut stream| async move {
1342 /// // 10
1343 /// # assert_eq!(stream.next().await.unwrap(), 10);
1344 /// # }));
1345 /// # }
1346 /// ```
1347 pub fn reduce_commutative<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1348 where
1349 F: Fn(&mut T, T) + 'a,
1350 {
1351 let nondet = nondet!(/** the combinator function is commutative */);
1352 self.assume_ordering(nondet).reduce(comb)
1353 }
1354
1355 fn reduce_commutative_trusted<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1356 where
1357 F: Fn(&mut T, T) + 'a,
1358 {
1359 let ordered = if B::BOUNDED {
1360 self.assume_ordering_trusted(nondet!(/** if bounded, there are no intermediate states and output is deterministic because trusted commutative */))
1361 } else {
1362 self.assume_ordering(nondet!(/** if unbounded, ordering affects intermediate states */))
1363 };
1364
1365 ordered.reduce(comb)
1366 }
1367
1368 /// Computes the number of elements in the stream as a [`Singleton`].
1369 ///
1370 /// # Example
1371 /// ```rust
1372 /// # #[cfg(feature = "deploy")] {
1373 /// # use hydro_lang::prelude::*;
1374 /// # use futures::StreamExt;
1375 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1376 /// let tick = process.tick();
1377 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1378 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1379 /// batch.count().all_ticks()
1380 /// # }, |mut stream| async move {
1381 /// // 4
1382 /// # assert_eq!(stream.next().await.unwrap(), 4);
1383 /// # }));
1384 /// # }
1385 /// ```
1386 pub fn count(self) -> Singleton<usize, L, B> {
1387 self.assume_ordering_trusted(nondet!(
1388 /// Order does not affect eventual count, and also does not affect intermediate states.
1389 ))
1390 .fold(q!(|| 0usize), q!(|count, _| *count += 1))
1391 }
1392}
1393
1394impl<'a, T, L, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R>
1395where
1396 L: Location<'a>,
1397{
1398 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1399 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1400 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1401 ///
1402 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1403 ///
1404 /// # Example
1405 /// ```rust
1406 /// # #[cfg(feature = "deploy")] {
1407 /// # use hydro_lang::prelude::*;
1408 /// # use futures::StreamExt;
1409 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1410 /// let tick = process.tick();
1411 /// let bools = process.source_iter(q!(vec![false, true, false]));
1412 /// let batch = bools.batch(&tick, nondet!(/** test */));
1413 /// batch
1414 /// .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1415 /// .all_ticks()
1416 /// # }, |mut stream| async move {
1417 /// // true
1418 /// # assert_eq!(stream.next().await.unwrap(), true);
1419 /// # }));
1420 /// # }
1421 /// ```
1422 pub fn fold_idempotent<A, I, F>(
1423 self,
1424 init: impl IntoQuotedMut<'a, I, L>,
1425 comb: impl IntoQuotedMut<'a, F, L>,
1426 ) -> Singleton<A, L, B>
1427 where
1428 I: Fn() -> A + 'a,
1429 F: Fn(&mut A, T),
1430 {
1431 let nondet = nondet!(/** the combinator function is idempotent */);
1432 self.assume_retries(nondet).fold(init, comb)
1433 }
1434
1435 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1436 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1437 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1438 /// reference, so that it can be modified in place.
1439 ///
1440 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1441 ///
1442 /// # Example
1443 /// ```rust
1444 /// # #[cfg(feature = "deploy")] {
1445 /// # use hydro_lang::prelude::*;
1446 /// # use futures::StreamExt;
1447 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1448 /// let tick = process.tick();
1449 /// let bools = process.source_iter(q!(vec![false, true, false]));
1450 /// let batch = bools.batch(&tick, nondet!(/** test */));
1451 /// batch.reduce_idempotent(q!(|acc, x| *acc |= x)).all_ticks()
1452 /// # }, |mut stream| async move {
1453 /// // true
1454 /// # assert_eq!(stream.next().await.unwrap(), true);
1455 /// # }));
1456 /// # }
1457 /// ```
1458 pub fn reduce_idempotent<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1459 where
1460 F: Fn(&mut T, T) + 'a,
1461 {
1462 let nondet = nondet!(/** the combinator function is idempotent */);
1463 self.assume_retries(nondet).reduce(comb)
1464 }
1465
1466 /// Computes the first element in the stream as an [`Optional`], which
1467 /// will be empty until the first element in the input arrives.
1468 ///
1469 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1470 /// re-ordering of elements may cause the first element to change.
1471 ///
1472 /// # Example
1473 /// ```rust
1474 /// # #[cfg(feature = "deploy")] {
1475 /// # use hydro_lang::prelude::*;
1476 /// # use futures::StreamExt;
1477 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1478 /// let tick = process.tick();
1479 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1480 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1481 /// batch.first().all_ticks()
1482 /// # }, |mut stream| async move {
1483 /// // 1
1484 /// # assert_eq!(stream.next().await.unwrap(), 1);
1485 /// # }));
1486 /// # }
1487 /// ```
1488 pub fn first(self) -> Optional<T, L, B> {
1489 self.reduce_idempotent(q!(|_, _| {}))
1490 }
1491
1492 /// Computes the last element in the stream as an [`Optional`], which
1493 /// will be empty until an element in the input arrives.
1494 ///
1495 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1496 /// re-ordering of elements may cause the last element to change.
1497 ///
1498 /// # Example
1499 /// ```rust
1500 /// # #[cfg(feature = "deploy")] {
1501 /// # use hydro_lang::prelude::*;
1502 /// # use futures::StreamExt;
1503 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1504 /// let tick = process.tick();
1505 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1506 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1507 /// batch.last().all_ticks()
1508 /// # }, |mut stream| async move {
1509 /// // 4
1510 /// # assert_eq!(stream.next().await.unwrap(), 4);
1511 /// # }));
1512 /// # }
1513 /// ```
1514 pub fn last(self) -> Optional<T, L, B> {
1515 self.reduce_idempotent(q!(|curr, new| *curr = new))
1516 }
1517}
1518
1519impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
1520where
1521 L: Location<'a>,
1522{
1523 /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1524 ///
1525 /// # Example
1526 /// ```rust
1527 /// # #[cfg(feature = "deploy")] {
1528 /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1529 /// # use futures::StreamExt;
1530 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, TotalOrder, ExactlyOnce>(|process| {
1531 /// let tick = process.tick();
1532 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1533 /// numbers.enumerate()
1534 /// # }, |mut stream| async move {
1535 /// // (0, 1), (1, 2), (2, 3), (3, 4)
1536 /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1537 /// # assert_eq!(stream.next().await.unwrap(), w);
1538 /// # }
1539 /// # }));
1540 /// # }
1541 /// ```
1542 pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder, ExactlyOnce> {
1543 Stream::new(
1544 self.location.clone(),
1545 HydroNode::Enumerate {
1546 input: Box::new(self.ir_node.into_inner()),
1547 metadata: self.location.new_node_metadata(Stream::<
1548 (usize, T),
1549 L,
1550 B,
1551 TotalOrder,
1552 ExactlyOnce,
1553 >::collection_kind()),
1554 },
1555 )
1556 }
1557
1558 /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1559 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1560 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1561 ///
1562 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1563 /// to depend on the order of elements in the stream.
1564 ///
1565 /// # Example
1566 /// ```rust
1567 /// # #[cfg(feature = "deploy")] {
1568 /// # use hydro_lang::prelude::*;
1569 /// # use futures::StreamExt;
1570 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1571 /// let tick = process.tick();
1572 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1573 /// let batch = words.batch(&tick, nondet!(/** test */));
1574 /// batch
1575 /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1576 /// .all_ticks()
1577 /// # }, |mut stream| async move {
1578 /// // "HELLOWORLD"
1579 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1580 /// # }));
1581 /// # }
1582 /// ```
1583 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
1584 self,
1585 init: impl IntoQuotedMut<'a, I, L>,
1586 comb: impl IntoQuotedMut<'a, F, L>,
1587 ) -> Singleton<A, L, B> {
1588 let init = init.splice_fn0_ctx(&self.location).into();
1589 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1590
1591 let core = HydroNode::Fold {
1592 init,
1593 acc: comb,
1594 input: Box::new(self.ir_node.into_inner()),
1595 metadata: self
1596 .location
1597 .new_node_metadata(Singleton::<A, L, B>::collection_kind()),
1598 };
1599
1600 Singleton::new(self.location, core)
1601 }
1602
1603 /// Collects all the elements of this stream into a single [`Vec`] element.
1604 ///
1605 /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1606 /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1607 /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1608 /// the vector at an arbitrary point in time.
1609 ///
1610 /// # Example
1611 /// ```rust
1612 /// # #[cfg(feature = "deploy")] {
1613 /// # use hydro_lang::prelude::*;
1614 /// # use futures::StreamExt;
1615 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1616 /// let tick = process.tick();
1617 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1618 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1619 /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1620 /// # }, |mut stream| async move {
1621 /// // [ vec![1, 2, 3, 4] ]
1622 /// # for w in vec![vec![1, 2, 3, 4]] {
1623 /// # assert_eq!(stream.next().await.unwrap(), w);
1624 /// # }
1625 /// # }));
1626 /// # }
1627 /// ```
1628 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B> {
1629 self.fold(
1630 q!(|| vec![]),
1631 q!(|acc, v| {
1632 acc.push(v);
1633 }),
1634 )
1635 }
1636
1637 /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1638 /// and emitting each intermediate result.
1639 ///
1640 /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1641 /// containing all intermediate accumulated values. The scan operation can also terminate early
1642 /// by returning `None`.
1643 ///
1644 /// The function takes a mutable reference to the accumulator and the current element, and returns
1645 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1646 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1647 ///
1648 /// # Examples
1649 ///
1650 /// Basic usage - running sum:
1651 /// ```rust
1652 /// # #[cfg(feature = "deploy")] {
1653 /// # use hydro_lang::prelude::*;
1654 /// # use futures::StreamExt;
1655 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1656 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1657 /// q!(|| 0),
1658 /// q!(|acc, x| {
1659 /// *acc += x;
1660 /// Some(*acc)
1661 /// }),
1662 /// )
1663 /// # }, |mut stream| async move {
1664 /// // Output: 1, 3, 6, 10
1665 /// # for w in vec![1, 3, 6, 10] {
1666 /// # assert_eq!(stream.next().await.unwrap(), w);
1667 /// # }
1668 /// # }));
1669 /// # }
1670 /// ```
1671 ///
1672 /// Early termination example:
1673 /// ```rust
1674 /// # #[cfg(feature = "deploy")] {
1675 /// # use hydro_lang::prelude::*;
1676 /// # use futures::StreamExt;
1677 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1678 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1679 /// q!(|| 1),
1680 /// q!(|state, x| {
1681 /// *state = *state * x;
1682 /// if *state > 6 {
1683 /// None // Terminate the stream
1684 /// } else {
1685 /// Some(-*state)
1686 /// }
1687 /// }),
1688 /// )
1689 /// # }, |mut stream| async move {
1690 /// // Output: -1, -2, -6
1691 /// # for w in vec![-1, -2, -6] {
1692 /// # assert_eq!(stream.next().await.unwrap(), w);
1693 /// # }
1694 /// # }));
1695 /// # }
1696 /// ```
1697 pub fn scan<A, U, I, F>(
1698 self,
1699 init: impl IntoQuotedMut<'a, I, L>,
1700 f: impl IntoQuotedMut<'a, F, L>,
1701 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1702 where
1703 I: Fn() -> A + 'a,
1704 F: Fn(&mut A, T) -> Option<U> + 'a,
1705 {
1706 let init = init.splice_fn0_ctx(&self.location).into();
1707 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1708
1709 Stream::new(
1710 self.location.clone(),
1711 HydroNode::Scan {
1712 init,
1713 acc: f,
1714 input: Box::new(self.ir_node.into_inner()),
1715 metadata: self.location.new_node_metadata(
1716 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1717 ),
1718 },
1719 )
1720 }
1721
1722 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1723 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1724 /// until the first element in the input arrives.
1725 ///
1726 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1727 /// to depend on the order of elements in the stream.
1728 ///
1729 /// # Example
1730 /// ```rust
1731 /// # #[cfg(feature = "deploy")] {
1732 /// # use hydro_lang::prelude::*;
1733 /// # use futures::StreamExt;
1734 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1735 /// let tick = process.tick();
1736 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1737 /// let batch = words.batch(&tick, nondet!(/** test */));
1738 /// batch
1739 /// .map(q!(|x| x.to_string()))
1740 /// .reduce(q!(|curr, new| curr.push_str(&new)))
1741 /// .all_ticks()
1742 /// # }, |mut stream| async move {
1743 /// // "HELLOWORLD"
1744 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1745 /// # }));
1746 /// # }
1747 /// ```
1748 pub fn reduce<F: Fn(&mut T, T) + 'a>(
1749 self,
1750 comb: impl IntoQuotedMut<'a, F, L>,
1751 ) -> Optional<T, L, B> {
1752 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1753 let core = HydroNode::Reduce {
1754 f,
1755 input: Box::new(self.ir_node.into_inner()),
1756 metadata: self
1757 .location
1758 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1759 };
1760
1761 Optional::new(self.location, core)
1762 }
1763}
1764
1765impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
1766 /// Produces a new stream that interleaves the elements of the two input streams.
1767 /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1768 ///
1769 /// Currently, both input streams must be [`Unbounded`]. When the streams are
1770 /// [`Bounded`], you can use [`Stream::chain`] instead.
1771 ///
1772 /// # Example
1773 /// ```rust
1774 /// # #[cfg(feature = "deploy")] {
1775 /// # use hydro_lang::prelude::*;
1776 /// # use futures::StreamExt;
1777 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1778 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1779 /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1780 /// # }, |mut stream| async move {
1781 /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1782 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1783 /// # assert_eq!(stream.next().await.unwrap(), w);
1784 /// # }
1785 /// # }));
1786 /// # }
1787 /// ```
1788 pub fn interleave<O2: Ordering, R2: Retries>(
1789 self,
1790 other: Stream<T, L, Unbounded, O2, R2>,
1791 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1792 where
1793 R: MinRetries<R2>,
1794 {
1795 Stream::new(
1796 self.location.clone(),
1797 HydroNode::Chain {
1798 first: Box::new(self.ir_node.into_inner()),
1799 second: Box::new(other.ir_node.into_inner()),
1800 metadata: self.location.new_node_metadata(Stream::<
1801 T,
1802 L,
1803 Unbounded,
1804 NoOrder,
1805 <R as MinRetries<R2>>::Min,
1806 >::collection_kind()),
1807 },
1808 )
1809 }
1810}
1811
1812impl<'a, T, L, O: Ordering, R: Retries> Stream<T, L, Bounded, O, R>
1813where
1814 L: Location<'a>,
1815{
1816 /// Produces a new stream that emits the input elements in sorted order.
1817 ///
1818 /// The input stream can have any ordering guarantee, but the output stream
1819 /// will have a [`TotalOrder`] guarantee. This operator will block until all
1820 /// elements in the input stream are available, so it requires the input stream
1821 /// to be [`Bounded`].
1822 ///
1823 /// # Example
1824 /// ```rust
1825 /// # #[cfg(feature = "deploy")] {
1826 /// # use hydro_lang::prelude::*;
1827 /// # use futures::StreamExt;
1828 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1829 /// let tick = process.tick();
1830 /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1831 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1832 /// batch.sort().all_ticks()
1833 /// # }, |mut stream| async move {
1834 /// // 1, 2, 3, 4
1835 /// # for w in (1..5) {
1836 /// # assert_eq!(stream.next().await.unwrap(), w);
1837 /// # }
1838 /// # }));
1839 /// # }
1840 /// ```
1841 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1842 where
1843 T: Ord,
1844 {
1845 Stream::new(
1846 self.location.clone(),
1847 HydroNode::Sort {
1848 input: Box::new(self.ir_node.into_inner()),
1849 metadata: self
1850 .location
1851 .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
1852 },
1853 )
1854 }
1855
1856 /// Produces a new stream that first emits the elements of the `self` stream,
1857 /// and then emits the elements of the `other` stream. The output stream has
1858 /// a [`TotalOrder`] guarantee if and only if both input streams have a
1859 /// [`TotalOrder`] guarantee.
1860 ///
1861 /// Currently, both input streams must be [`Bounded`]. This operator will block
1862 /// on the first stream until all its elements are available. In a future version,
1863 /// we will relax the requirement on the `other` stream.
1864 ///
1865 /// # Example
1866 /// ```rust
1867 /// # #[cfg(feature = "deploy")] {
1868 /// # use hydro_lang::prelude::*;
1869 /// # use futures::StreamExt;
1870 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1871 /// let tick = process.tick();
1872 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1873 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1874 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1875 /// # }, |mut stream| async move {
1876 /// // 2, 3, 4, 5, 1, 2, 3, 4
1877 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1878 /// # assert_eq!(stream.next().await.unwrap(), w);
1879 /// # }
1880 /// # }));
1881 /// # }
1882 /// ```
1883 pub fn chain<O2: Ordering, R2: Retries>(
1884 self,
1885 other: Stream<T, L, Bounded, O2, R2>,
1886 ) -> Stream<T, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1887 where
1888 O: MinOrder<O2>,
1889 R: MinRetries<R2>,
1890 {
1891 check_matching_location(&self.location, &other.location);
1892
1893 Stream::new(
1894 self.location.clone(),
1895 HydroNode::Chain {
1896 first: Box::new(self.ir_node.into_inner()),
1897 second: Box::new(other.ir_node.into_inner()),
1898 metadata: self.location.new_node_metadata(Stream::<
1899 T,
1900 L,
1901 Bounded,
1902 <O as MinOrder<O2>>::Min,
1903 <R as MinRetries<R2>>::Min,
1904 >::collection_kind()),
1905 },
1906 )
1907 }
1908
1909 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
1910 /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
1911 /// because this is compiled into a nested loop.
1912 pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
1913 self,
1914 other: Stream<T2, L, Bounded, O2, R>,
1915 ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
1916 where
1917 T: Clone,
1918 T2: Clone,
1919 {
1920 check_matching_location(&self.location, &other.location);
1921
1922 Stream::new(
1923 self.location.clone(),
1924 HydroNode::CrossProduct {
1925 left: Box::new(self.ir_node.into_inner()),
1926 right: Box::new(other.ir_node.into_inner()),
1927 metadata: self.location.new_node_metadata(Stream::<
1928 (T, T2),
1929 L,
1930 Bounded,
1931 <O2 as MinOrder<O>>::Min,
1932 R,
1933 >::collection_kind()),
1934 },
1935 )
1936 }
1937
1938 /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
1939 /// `self` used as the values for *each* key.
1940 ///
1941 /// This is helpful when "broadcasting" a set of values so that all the keys have the same
1942 /// values. For example, it can be used to send the same set of elements to several cluster
1943 /// members, if the membership information is available as a [`KeyedSingleton`].
1944 ///
1945 /// # Example
1946 /// ```rust
1947 /// # #[cfg(feature = "deploy")] {
1948 /// # use hydro_lang::prelude::*;
1949 /// # use futures::StreamExt;
1950 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1951 /// # let tick = process.tick();
1952 /// let keyed_singleton = // { 1: (), 2: () }
1953 /// # process
1954 /// # .source_iter(q!(vec![(1, ()), (2, ())]))
1955 /// # .into_keyed()
1956 /// # .batch(&tick, nondet!(/** test */))
1957 /// # .first();
1958 /// let stream = // [ "a", "b" ]
1959 /// # process
1960 /// # .source_iter(q!(vec!["a".to_string(), "b".to_string()]))
1961 /// # .batch(&tick, nondet!(/** test */));
1962 /// stream.repeat_with_keys(keyed_singleton)
1963 /// # .entries().all_ticks()
1964 /// # }, |mut stream| async move {
1965 /// // { 1: ["a", "b" ], 2: ["a", "b"] }
1966 /// # let mut results = Vec::new();
1967 /// # for _ in 0..4 {
1968 /// # results.push(stream.next().await.unwrap());
1969 /// # }
1970 /// # results.sort();
1971 /// # assert_eq!(results, vec![(1, "a".to_string()), (1, "b".to_string()), (2, "a".to_string()), (2, "b".to_string())]);
1972 /// # }));
1973 /// # }
1974 /// ```
1975 pub fn repeat_with_keys<K, V2>(
1976 self,
1977 keys: KeyedSingleton<K, V2, L, Bounded>,
1978 ) -> KeyedStream<K, T, L, Bounded, O, R>
1979 where
1980 K: Clone,
1981 T: Clone,
1982 {
1983 keys.keys()
1984 .weaken_retries()
1985 .assume_ordering_trusted::<TotalOrder>(
1986 nondet!(/** keyed stream does not depend on ordering of keys */),
1987 )
1988 .cross_product_nested_loop(self)
1989 .into_keyed()
1990 }
1991}
1992
1993impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
1994where
1995 L: Location<'a>,
1996{
1997 #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
1998 /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
1999 /// by equi-joining the two streams on the key attribute `K`.
2000 ///
2001 /// # Example
2002 /// ```rust
2003 /// # #[cfg(feature = "deploy")] {
2004 /// # use hydro_lang::prelude::*;
2005 /// # use std::collections::HashSet;
2006 /// # use futures::StreamExt;
2007 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2008 /// let tick = process.tick();
2009 /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2010 /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2011 /// stream1.join(stream2)
2012 /// # }, |mut stream| async move {
2013 /// // (1, ('a', 'x')), (2, ('b', 'y'))
2014 /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2015 /// # stream.map(|i| assert!(expected.contains(&i)));
2016 /// # }));
2017 /// # }
2018 pub fn join<V2, O2: Ordering, R2: Retries>(
2019 self,
2020 n: Stream<(K, V2), L, B, O2, R2>,
2021 ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
2022 where
2023 K: Eq + Hash,
2024 R: MinRetries<R2>,
2025 {
2026 check_matching_location(&self.location, &n.location);
2027
2028 Stream::new(
2029 self.location.clone(),
2030 HydroNode::Join {
2031 left: Box::new(self.ir_node.into_inner()),
2032 right: Box::new(n.ir_node.into_inner()),
2033 metadata: self.location.new_node_metadata(Stream::<
2034 (K, (V1, V2)),
2035 L,
2036 B,
2037 NoOrder,
2038 <R as MinRetries<R2>>::Min,
2039 >::collection_kind()),
2040 },
2041 )
2042 }
2043
2044 /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2045 /// computes the anti-join of the items in the input -- i.e. returns
2046 /// unique items in the first input that do not have a matching key
2047 /// in the second input.
2048 ///
2049 /// # Example
2050 /// ```rust
2051 /// # #[cfg(feature = "deploy")] {
2052 /// # use hydro_lang::prelude::*;
2053 /// # use futures::StreamExt;
2054 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2055 /// let tick = process.tick();
2056 /// let stream = process
2057 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2058 /// .batch(&tick, nondet!(/** test */));
2059 /// let batch = process
2060 /// .source_iter(q!(vec![1, 2]))
2061 /// .batch(&tick, nondet!(/** test */));
2062 /// stream.anti_join(batch).all_ticks()
2063 /// # }, |mut stream| async move {
2064 /// # for w in vec![(3, 'c'), (4, 'd')] {
2065 /// # assert_eq!(stream.next().await.unwrap(), w);
2066 /// # }
2067 /// # }));
2068 /// # }
2069 pub fn anti_join<O2: Ordering, R2: Retries>(
2070 self,
2071 n: Stream<K, L, Bounded, O2, R2>,
2072 ) -> Stream<(K, V1), L, B, O, R>
2073 where
2074 K: Eq + Hash,
2075 {
2076 check_matching_location(&self.location, &n.location);
2077
2078 Stream::new(
2079 self.location.clone(),
2080 HydroNode::AntiJoin {
2081 pos: Box::new(self.ir_node.into_inner()),
2082 neg: Box::new(n.ir_node.into_inner()),
2083 metadata: self
2084 .location
2085 .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2086 },
2087 )
2088 }
2089}
2090
2091impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2092 Stream<(K, V), L, B, O, R>
2093{
2094 /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2095 /// is used as the key and the second element is added to the entries associated with that key.
2096 ///
2097 /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2098 /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2099 /// performing grouped aggregations, but also for more precise ordering guarantees such as
2100 /// total ordering _within_ each group but no ordering _across_ groups.
2101 ///
2102 /// # Example
2103 /// ```rust
2104 /// # #[cfg(feature = "deploy")] {
2105 /// # use hydro_lang::prelude::*;
2106 /// # use futures::StreamExt;
2107 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2108 /// process
2109 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2110 /// .into_keyed()
2111 /// # .entries()
2112 /// # }, |mut stream| async move {
2113 /// // { 1: [2, 3], 2: [4] }
2114 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2115 /// # assert_eq!(stream.next().await.unwrap(), w);
2116 /// # }
2117 /// # }));
2118 /// # }
2119 /// ```
2120 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2121 KeyedStream::new(
2122 self.location.clone(),
2123 HydroNode::Cast {
2124 inner: Box::new(self.ir_node.into_inner()),
2125 metadata: self
2126 .location
2127 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2128 },
2129 )
2130 }
2131}
2132
2133impl<'a, K, V, L> Stream<(K, V), Tick<L>, Bounded, TotalOrder, ExactlyOnce>
2134where
2135 K: Eq + Hash,
2136 L: Location<'a>,
2137{
2138 #[deprecated = "use .into_keyed().fold(...) instead"]
2139 /// A special case of [`Stream::fold`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2140 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2141 /// in the second element are accumulated via the `comb` closure.
2142 ///
2143 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
2144 /// to depend on the order of elements in the stream.
2145 ///
2146 /// If the input and output value types are the same and do not require initialization then use
2147 /// [`Stream::reduce_keyed`].
2148 ///
2149 /// # Example
2150 /// ```rust
2151 /// # #[cfg(feature = "deploy")] {
2152 /// # use hydro_lang::prelude::*;
2153 /// # use futures::StreamExt;
2154 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2155 /// let tick = process.tick();
2156 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2157 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2158 /// batch
2159 /// .fold_keyed(q!(|| 0), q!(|acc, x| *acc += x))
2160 /// .all_ticks()
2161 /// # }, |mut stream| async move {
2162 /// // (1, 5), (2, 7)
2163 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2164 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2165 /// # }));
2166 /// # }
2167 /// ```
2168 pub fn fold_keyed<A, I, F>(
2169 self,
2170 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2171 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2172 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2173 where
2174 I: Fn() -> A + 'a,
2175 F: Fn(&mut A, V) + 'a,
2176 {
2177 self.into_keyed().fold(init, comb).entries()
2178 }
2179
2180 #[deprecated = "use .into_keyed().reduce(...) instead"]
2181 /// A special case of [`Stream::reduce`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2182 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2183 /// in the second element are accumulated via the `comb` closure.
2184 ///
2185 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
2186 /// to depend on the order of elements in the stream.
2187 ///
2188 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed`].
2189 ///
2190 /// # Example
2191 /// ```rust
2192 /// # #[cfg(feature = "deploy")] {
2193 /// # use hydro_lang::prelude::*;
2194 /// # use futures::StreamExt;
2195 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2196 /// let tick = process.tick();
2197 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2198 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2199 /// batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks()
2200 /// # }, |mut stream| async move {
2201 /// // (1, 5), (2, 7)
2202 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2203 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2204 /// # }));
2205 /// # }
2206 /// ```
2207 pub fn reduce_keyed<F>(
2208 self,
2209 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2210 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2211 where
2212 F: Fn(&mut V, V) + 'a,
2213 {
2214 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
2215
2216 Stream::new(
2217 self.location.clone(),
2218 HydroNode::ReduceKeyed {
2219 f,
2220 input: Box::new(self.ir_node.into_inner()),
2221 metadata: self.location.new_node_metadata(Stream::<
2222 (K, V),
2223 Tick<L>,
2224 Bounded,
2225 NoOrder,
2226 ExactlyOnce,
2227 >::collection_kind()),
2228 },
2229 )
2230 }
2231}
2232
2233impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2234where
2235 K: Eq + Hash,
2236 L: Location<'a>,
2237{
2238 #[deprecated = "use .into_keyed().fold_commutative_idempotent(...) instead"]
2239 /// A special case of [`Stream::fold_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2240 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2241 /// in the second element are accumulated via the `comb` closure.
2242 ///
2243 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2244 /// as there may be non-deterministic duplicates.
2245 ///
2246 /// If the input and output value types are the same and do not require initialization then use
2247 /// [`Stream::reduce_keyed_commutative_idempotent`].
2248 ///
2249 /// # Example
2250 /// ```rust
2251 /// # #[cfg(feature = "deploy")] {
2252 /// # use hydro_lang::prelude::*;
2253 /// # use futures::StreamExt;
2254 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2255 /// let tick = process.tick();
2256 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2257 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2258 /// batch
2259 /// .fold_keyed_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2260 /// .all_ticks()
2261 /// # }, |mut stream| async move {
2262 /// // (1, false), (2, true)
2263 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2264 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2265 /// # }));
2266 /// # }
2267 /// ```
2268 pub fn fold_keyed_commutative_idempotent<A, I, F>(
2269 self,
2270 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2271 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2272 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2273 where
2274 I: Fn() -> A + 'a,
2275 F: Fn(&mut A, V) + 'a,
2276 {
2277 self.into_keyed()
2278 .fold_commutative_idempotent(init, comb)
2279 .entries()
2280 }
2281
2282 /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2283 /// # Example
2284 /// ```rust
2285 /// # #[cfg(feature = "deploy")] {
2286 /// # use hydro_lang::prelude::*;
2287 /// # use futures::StreamExt;
2288 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2289 /// let tick = process.tick();
2290 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2291 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2292 /// batch.keys().all_ticks()
2293 /// # }, |mut stream| async move {
2294 /// // 1, 2
2295 /// # assert_eq!(stream.next().await.unwrap(), 1);
2296 /// # assert_eq!(stream.next().await.unwrap(), 2);
2297 /// # }));
2298 /// # }
2299 /// ```
2300 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2301 self.into_keyed()
2302 .fold_commutative_idempotent(q!(|| ()), q!(|_, _| {}))
2303 .keys()
2304 }
2305
2306 #[deprecated = "use .into_keyed().reduce_commutative_idempotent(...) instead"]
2307 /// A special case of [`Stream::reduce_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2308 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2309 /// in the second element are accumulated via the `comb` closure.
2310 ///
2311 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2312 /// as there may be non-deterministic duplicates.
2313 ///
2314 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative_idempotent`].
2315 ///
2316 /// # Example
2317 /// ```rust
2318 /// # #[cfg(feature = "deploy")] {
2319 /// # use hydro_lang::prelude::*;
2320 /// # use futures::StreamExt;
2321 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2322 /// let tick = process.tick();
2323 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2324 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2325 /// batch
2326 /// .reduce_keyed_commutative_idempotent(q!(|acc, x| *acc |= x))
2327 /// .all_ticks()
2328 /// # }, |mut stream| async move {
2329 /// // (1, false), (2, true)
2330 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2331 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2332 /// # }));
2333 /// # }
2334 /// ```
2335 pub fn reduce_keyed_commutative_idempotent<F>(
2336 self,
2337 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2338 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2339 where
2340 F: Fn(&mut V, V) + 'a,
2341 {
2342 self.into_keyed()
2343 .reduce_commutative_idempotent(comb)
2344 .entries()
2345 }
2346}
2347
2348impl<'a, K, V, L, O: Ordering> Stream<(K, V), Tick<L>, Bounded, O, ExactlyOnce>
2349where
2350 K: Eq + Hash,
2351 L: Location<'a>,
2352{
2353 #[deprecated = "use .into_keyed().fold_commutative(...) instead"]
2354 /// A special case of [`Stream::fold_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2355 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2356 /// in the second element are accumulated via the `comb` closure.
2357 ///
2358 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2359 ///
2360 /// If the input and output value types are the same and do not require initialization then use
2361 /// [`Stream::reduce_keyed_commutative`].
2362 ///
2363 /// # Example
2364 /// ```rust
2365 /// # #[cfg(feature = "deploy")] {
2366 /// # use hydro_lang::prelude::*;
2367 /// # use futures::StreamExt;
2368 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2369 /// let tick = process.tick();
2370 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2371 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2372 /// batch
2373 /// .fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x))
2374 /// .all_ticks()
2375 /// # }, |mut stream| async move {
2376 /// // (1, 5), (2, 7)
2377 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2378 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2379 /// # }));
2380 /// # }
2381 /// ```
2382 pub fn fold_keyed_commutative<A, I, F>(
2383 self,
2384 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2385 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2386 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2387 where
2388 I: Fn() -> A + 'a,
2389 F: Fn(&mut A, V) + 'a,
2390 {
2391 self.into_keyed().fold_commutative(init, comb).entries()
2392 }
2393
2394 #[deprecated = "use .into_keyed().reduce_commutative(...) instead"]
2395 /// A special case of [`Stream::reduce_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2396 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2397 /// in the second element are accumulated via the `comb` closure.
2398 ///
2399 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2400 ///
2401 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative`].
2402 ///
2403 /// # Example
2404 /// ```rust
2405 /// # #[cfg(feature = "deploy")] {
2406 /// # use hydro_lang::prelude::*;
2407 /// # use futures::StreamExt;
2408 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2409 /// let tick = process.tick();
2410 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2411 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2412 /// batch
2413 /// .reduce_keyed_commutative(q!(|acc, x| *acc += x))
2414 /// .all_ticks()
2415 /// # }, |mut stream| async move {
2416 /// // (1, 5), (2, 7)
2417 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2418 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2419 /// # }));
2420 /// # }
2421 /// ```
2422 pub fn reduce_keyed_commutative<F>(
2423 self,
2424 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2425 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2426 where
2427 F: Fn(&mut V, V) + 'a,
2428 {
2429 self.into_keyed().reduce_commutative(comb).entries()
2430 }
2431}
2432
2433impl<'a, K, V, L, R: Retries> Stream<(K, V), Tick<L>, Bounded, TotalOrder, R>
2434where
2435 K: Eq + Hash,
2436 L: Location<'a>,
2437{
2438 #[deprecated = "use .into_keyed().fold_idempotent(...) instead"]
2439 /// A special case of [`Stream::fold_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2440 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2441 /// in the second element are accumulated via the `comb` closure.
2442 ///
2443 /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
2444 ///
2445 /// If the input and output value types are the same and do not require initialization then use
2446 /// [`Stream::reduce_keyed_idempotent`].
2447 ///
2448 /// # Example
2449 /// ```rust
2450 /// # #[cfg(feature = "deploy")] {
2451 /// # use hydro_lang::prelude::*;
2452 /// # use futures::StreamExt;
2453 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2454 /// let tick = process.tick();
2455 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2456 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2457 /// batch
2458 /// .fold_keyed_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2459 /// .all_ticks()
2460 /// # }, |mut stream| async move {
2461 /// // (1, false), (2, true)
2462 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2463 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2464 /// # }));
2465 /// # }
2466 /// ```
2467 pub fn fold_keyed_idempotent<A, I, F>(
2468 self,
2469 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2470 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2471 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2472 where
2473 I: Fn() -> A + 'a,
2474 F: Fn(&mut A, V) + 'a,
2475 {
2476 self.into_keyed().fold_idempotent(init, comb).entries()
2477 }
2478
2479 #[deprecated = "use .into_keyed().reduce_idempotent(...) instead"]
2480 /// A special case of [`Stream::reduce_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2481 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2482 /// in the second element are accumulated via the `comb` closure.
2483 ///
2484 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
2485 ///
2486 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_idempotent`].
2487 ///
2488 /// # Example
2489 /// ```rust
2490 /// # #[cfg(feature = "deploy")] {
2491 /// # use hydro_lang::prelude::*;
2492 /// # use futures::StreamExt;
2493 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2494 /// let tick = process.tick();
2495 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2496 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2497 /// batch
2498 /// .reduce_keyed_idempotent(q!(|acc, x| *acc |= x))
2499 /// .all_ticks()
2500 /// # }, |mut stream| async move {
2501 /// // (1, false), (2, true)
2502 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2503 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2504 /// # }));
2505 /// # }
2506 /// ```
2507 pub fn reduce_keyed_idempotent<F>(
2508 self,
2509 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2510 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2511 where
2512 F: Fn(&mut V, V) + 'a,
2513 {
2514 self.into_keyed().reduce_idempotent(comb).entries()
2515 }
2516}
2517
2518impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2519where
2520 L: Location<'a> + NoTick,
2521{
2522 /// Returns a stream corresponding to the latest batch of elements being atomically
2523 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2524 /// the order of the input.
2525 ///
2526 /// # Non-Determinism
2527 /// The batch boundaries are non-deterministic and may change across executions.
2528 pub fn batch_atomic(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2529 Stream::new(
2530 self.location.clone().tick,
2531 HydroNode::Batch {
2532 inner: Box::new(self.ir_node.into_inner()),
2533 metadata: self
2534 .location
2535 .tick
2536 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2537 },
2538 )
2539 }
2540
2541 /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2542 /// See [`Stream::atomic`] for more details.
2543 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2544 Stream::new(
2545 self.location.tick.l.clone(),
2546 HydroNode::EndAtomic {
2547 inner: Box::new(self.ir_node.into_inner()),
2548 metadata: self
2549 .location
2550 .tick
2551 .l
2552 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2553 },
2554 )
2555 }
2556}
2557
2558impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2559where
2560 L: Location<'a>,
2561{
2562 /// Shifts this stream into an atomic context, which guarantees that any downstream logic
2563 /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
2564 ///
2565 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2566 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2567 /// argument that declares where the stream will be atomically processed. Batching a stream into
2568 /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2569 /// [`Tick`] will introduce asynchrony.
2570 pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
2571 let out_location = Atomic { tick: tick.clone() };
2572 Stream::new(
2573 out_location.clone(),
2574 HydroNode::BeginAtomic {
2575 inner: Box::new(self.ir_node.into_inner()),
2576 metadata: out_location
2577 .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
2578 },
2579 )
2580 }
2581
2582 /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2583 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2584 /// the order of the input. The output stream will execute in the [`Tick`] that was
2585 /// used to create the atomic section.
2586 ///
2587 /// # Non-Determinism
2588 /// The batch boundaries are non-deterministic and may change across executions.
2589 pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2590 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2591 Stream::new(
2592 tick.clone(),
2593 HydroNode::Batch {
2594 inner: Box::new(self.ir_node.into_inner()),
2595 metadata: tick
2596 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2597 },
2598 )
2599 }
2600
2601 /// Given a time interval, returns a stream corresponding to samples taken from the
2602 /// stream roughly at that interval. The output will have elements in the same order
2603 /// as the input, but with arbitrary elements skipped between samples. There is also
2604 /// no guarantee on the exact timing of the samples.
2605 ///
2606 /// # Non-Determinism
2607 /// The output stream is non-deterministic in which elements are sampled, since this
2608 /// is controlled by a clock.
2609 pub fn sample_every(
2610 self,
2611 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
2612 nondet: NonDet,
2613 ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
2614 where
2615 L: NoTick + NoAtomic,
2616 {
2617 let samples = self.location.source_interval(interval, nondet);
2618
2619 let tick = self.location.tick();
2620 self.batch(&tick, nondet)
2621 .filter_if_some(samples.batch(&tick, nondet).first())
2622 .all_ticks()
2623 .weakest_retries()
2624 }
2625
2626 /// Given a timeout duration, returns an [`Optional`] which will have a value if the
2627 /// stream has not emitted a value since that duration.
2628 ///
2629 /// # Non-Determinism
2630 /// Timeout relies on non-deterministic sampling of the stream, so depending on when
2631 /// samples take place, timeouts may be non-deterministically generated or missed,
2632 /// and the notification of the timeout may be delayed as well. There is also no
2633 /// guarantee on how long the [`Optional`] will have a value after the timeout is
2634 /// detected based on when the next sample is taken.
2635 pub fn timeout(
2636 self,
2637 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
2638 nondet: NonDet,
2639 ) -> Optional<(), L, Unbounded>
2640 where
2641 L: NoTick + NoAtomic,
2642 {
2643 let tick = self.location.tick();
2644
2645 let latest_received = self.assume_retries(nondet).fold_commutative(
2646 q!(|| None),
2647 q!(|latest, _| {
2648 *latest = Some(Instant::now());
2649 }),
2650 );
2651
2652 latest_received
2653 .snapshot(&tick, nondet)
2654 .filter_map(q!(move |latest_received| {
2655 if let Some(latest_received) = latest_received {
2656 if Instant::now().duration_since(latest_received) > duration {
2657 Some(())
2658 } else {
2659 None
2660 }
2661 } else {
2662 Some(())
2663 }
2664 }))
2665 .latest()
2666 }
2667}
2668
2669impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2670where
2671 L: Location<'a> + NoTick + NoAtomic,
2672 F: Future<Output = T>,
2673{
2674 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2675 /// Future outputs are produced as available, regardless of input arrival order.
2676 ///
2677 /// # Example
2678 /// ```rust
2679 /// # #[cfg(feature = "deploy")] {
2680 /// # use std::collections::HashSet;
2681 /// # use futures::StreamExt;
2682 /// # use hydro_lang::prelude::*;
2683 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2684 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2685 /// .map(q!(|x| async move {
2686 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2687 /// x
2688 /// }))
2689 /// .resolve_futures()
2690 /// # },
2691 /// # |mut stream| async move {
2692 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2693 /// # let mut output = HashSet::new();
2694 /// # for _ in 1..10 {
2695 /// # output.insert(stream.next().await.unwrap());
2696 /// # }
2697 /// # assert_eq!(
2698 /// # output,
2699 /// # HashSet::<i32>::from_iter(1..10)
2700 /// # );
2701 /// # },
2702 /// # ));
2703 /// # }
2704 pub fn resolve_futures(self) -> Stream<T, L, B, NoOrder, R> {
2705 Stream::new(
2706 self.location.clone(),
2707 HydroNode::ResolveFutures {
2708 input: Box::new(self.ir_node.into_inner()),
2709 metadata: self
2710 .location
2711 .new_node_metadata(Stream::<T, L, B, NoOrder, R>::collection_kind()),
2712 },
2713 )
2714 }
2715
2716 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2717 /// Future outputs are produced in the same order as the input stream.
2718 ///
2719 /// # Example
2720 /// ```rust
2721 /// # #[cfg(feature = "deploy")] {
2722 /// # use std::collections::HashSet;
2723 /// # use futures::StreamExt;
2724 /// # use hydro_lang::prelude::*;
2725 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2726 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2727 /// .map(q!(|x| async move {
2728 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2729 /// x
2730 /// }))
2731 /// .resolve_futures_ordered()
2732 /// # },
2733 /// # |mut stream| async move {
2734 /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2735 /// # let mut output = Vec::new();
2736 /// # for _ in 1..10 {
2737 /// # output.push(stream.next().await.unwrap());
2738 /// # }
2739 /// # assert_eq!(
2740 /// # output,
2741 /// # vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2742 /// # );
2743 /// # },
2744 /// # ));
2745 /// # }
2746 pub fn resolve_futures_ordered(self) -> Stream<T, L, B, O, R> {
2747 Stream::new(
2748 self.location.clone(),
2749 HydroNode::ResolveFuturesOrdered {
2750 input: Box::new(self.ir_node.into_inner()),
2751 metadata: self
2752 .location
2753 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2754 },
2755 )
2756 }
2757}
2758
2759impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
2760where
2761 L: Location<'a> + NoTick,
2762{
2763 /// Executes the provided closure for every element in this stream.
2764 ///
2765 /// Because the closure may have side effects, the stream must have deterministic order
2766 /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
2767 /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
2768 /// [`Stream::assume_retries`] with an explanation for why this is the case.
2769 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
2770 let f = f.splice_fn1_ctx(&self.location).into();
2771 self.location
2772 .flow_state()
2773 .borrow_mut()
2774 .push_root(HydroRoot::ForEach {
2775 input: Box::new(self.ir_node.into_inner()),
2776 f,
2777 op_metadata: HydroIrOpMetadata::new(),
2778 });
2779 }
2780
2781 /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
2782 /// TCP socket to some other server. You should _not_ use this API for interacting with
2783 /// external clients, instead see [`Location::bidi_external_many_bytes`] and
2784 /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
2785 /// interaction with asynchronous sinks.
2786 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
2787 where
2788 S: 'a + futures::Sink<T> + Unpin,
2789 {
2790 self.location
2791 .flow_state()
2792 .borrow_mut()
2793 .push_root(HydroRoot::DestSink {
2794 sink: sink.splice_typed_ctx(&self.location).into(),
2795 input: Box::new(self.ir_node.into_inner()),
2796 op_metadata: HydroIrOpMetadata::new(),
2797 });
2798 }
2799}
2800
2801impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2802where
2803 L: Location<'a>,
2804{
2805 /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2806 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2807 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2808 Stream::new(
2809 self.location.outer().clone(),
2810 HydroNode::YieldConcat {
2811 inner: Box::new(self.ir_node.into_inner()),
2812 metadata: self
2813 .location
2814 .outer()
2815 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2816 },
2817 )
2818 }
2819
2820 /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2821 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2822 ///
2823 /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2824 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2825 /// stream's [`Tick`] context.
2826 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2827 let out_location = Atomic {
2828 tick: self.location.clone(),
2829 };
2830
2831 Stream::new(
2832 out_location.clone(),
2833 HydroNode::YieldConcat {
2834 inner: Box::new(self.ir_node.into_inner()),
2835 metadata: out_location
2836 .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2837 },
2838 )
2839 }
2840
2841 /// Accumulates the elements of this stream **across ticks** by concatenating them together.
2842 ///
2843 /// The output stream in tick T will contain the elements of the input at tick 0, 1, ..., up to
2844 /// and including tick T. This is useful for accumulating streaming inputs across ticks, but be
2845 /// careful when using this operator, as its memory usage will grow linearly over time since it
2846 /// must store its inputs indefinitely.
2847 ///
2848 /// # Example
2849 /// ```rust
2850 /// # #[cfg(feature = "deploy")] {
2851 /// # use hydro_lang::prelude::*;
2852 /// # use futures::StreamExt;
2853 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2854 /// let tick = process.tick();
2855 /// // ticks are lazy by default, forces the second tick to run
2856 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2857 ///
2858 /// let batch_first_tick = process
2859 /// .source_iter(q!(vec![1, 2, 3, 4]))
2860 /// .batch(&tick, nondet!(/** test */));
2861 /// let batch_second_tick = process
2862 /// .source_iter(q!(vec![5, 6, 7, 8]))
2863 /// .batch(&tick, nondet!(/** test */))
2864 /// .defer_tick(); // appears on the second tick
2865 /// batch_first_tick.chain(batch_second_tick)
2866 /// .persist()
2867 /// .all_ticks()
2868 /// # }, |mut stream| async move {
2869 /// // [1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, ...]
2870 /// # for w in vec![1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8] {
2871 /// # assert_eq!(stream.next().await.unwrap(), w);
2872 /// # }
2873 /// # }));
2874 /// # }
2875 /// ```
2876 pub fn persist(self) -> Stream<T, Tick<L>, Bounded, O, R>
2877 where
2878 T: Clone,
2879 {
2880 Stream::new(
2881 self.location.clone(),
2882 HydroNode::Persist {
2883 inner: Box::new(self.ir_node.into_inner()),
2884 metadata: self
2885 .location
2886 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2887 },
2888 )
2889 }
2890
2891 /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2892 /// always has the elements of `self` at tick `T - 1`.
2893 ///
2894 /// At tick `0`, the output stream is empty, since there is no previous tick.
2895 ///
2896 /// This operator enables stateful iterative processing with ticks, by sending data from one
2897 /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2898 ///
2899 /// # Example
2900 /// ```rust
2901 /// # #[cfg(feature = "deploy")] {
2902 /// # use hydro_lang::prelude::*;
2903 /// # use futures::StreamExt;
2904 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2905 /// let tick = process.tick();
2906 /// // ticks are lazy by default, forces the second tick to run
2907 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2908 ///
2909 /// let batch_first_tick = process
2910 /// .source_iter(q!(vec![1, 2, 3, 4]))
2911 /// .batch(&tick, nondet!(/** test */));
2912 /// let batch_second_tick = process
2913 /// .source_iter(q!(vec![0, 3, 4, 5, 6]))
2914 /// .batch(&tick, nondet!(/** test */))
2915 /// .defer_tick(); // appears on the second tick
2916 /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2917 ///
2918 /// changes_across_ticks.clone().filter_not_in(
2919 /// changes_across_ticks.defer_tick() // the elements from the previous tick
2920 /// ).all_ticks()
2921 /// # }, |mut stream| async move {
2922 /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2923 /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2924 /// # assert_eq!(stream.next().await.unwrap(), w);
2925 /// # }
2926 /// # }));
2927 /// # }
2928 /// ```
2929 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2930 Stream::new(
2931 self.location.clone(),
2932 HydroNode::DeferTick {
2933 input: Box::new(self.ir_node.into_inner()),
2934 metadata: self
2935 .location
2936 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2937 },
2938 )
2939 }
2940}
2941
2942#[cfg(test)]
2943mod tests {
2944 #[cfg(feature = "deploy")]
2945 use futures::{SinkExt, StreamExt};
2946 #[cfg(feature = "deploy")]
2947 use hydro_deploy::Deployment;
2948 #[cfg(feature = "deploy")]
2949 use serde::{Deserialize, Serialize};
2950 #[cfg(feature = "deploy")]
2951 use stageleft::q;
2952
2953 #[cfg(any(feature = "deploy", feature = "sim"))]
2954 use crate::compile::builder::FlowBuilder;
2955 #[cfg(feature = "deploy")]
2956 use crate::live_collections::stream::ExactlyOnce;
2957 #[cfg(feature = "sim")]
2958 use crate::live_collections::stream::NoOrder;
2959 #[cfg(any(feature = "deploy", feature = "sim"))]
2960 use crate::live_collections::stream::TotalOrder;
2961 #[cfg(any(feature = "deploy", feature = "sim"))]
2962 use crate::location::Location;
2963 #[cfg(any(feature = "deploy", feature = "sim"))]
2964 use crate::nondet::nondet;
2965
2966 mod backtrace_chained_ops;
2967
2968 #[cfg(feature = "deploy")]
2969 struct P1 {}
2970 #[cfg(feature = "deploy")]
2971 struct P2 {}
2972
2973 #[cfg(feature = "deploy")]
2974 #[derive(Serialize, Deserialize, Debug)]
2975 struct SendOverNetwork {
2976 n: u32,
2977 }
2978
2979 #[cfg(feature = "deploy")]
2980 #[tokio::test]
2981 async fn first_ten_distributed() {
2982 let mut deployment = Deployment::new();
2983
2984 let flow = FlowBuilder::new();
2985 let first_node = flow.process::<P1>();
2986 let second_node = flow.process::<P2>();
2987 let external = flow.external::<P2>();
2988
2989 let numbers = first_node.source_iter(q!(0..10));
2990 let out_port = numbers
2991 .map(q!(|n| SendOverNetwork { n }))
2992 .send_bincode(&second_node)
2993 .send_bincode_external(&external);
2994
2995 let nodes = flow
2996 .with_process(&first_node, deployment.Localhost())
2997 .with_process(&second_node, deployment.Localhost())
2998 .with_external(&external, deployment.Localhost())
2999 .deploy(&mut deployment);
3000
3001 deployment.deploy().await.unwrap();
3002
3003 let mut external_out = nodes.connect(out_port).await;
3004
3005 deployment.start().await.unwrap();
3006
3007 for i in 0..10 {
3008 assert_eq!(external_out.next().await.unwrap().n, i);
3009 }
3010 }
3011
3012 #[cfg(feature = "deploy")]
3013 #[tokio::test]
3014 async fn first_cardinality() {
3015 let mut deployment = Deployment::new();
3016
3017 let flow = FlowBuilder::new();
3018 let node = flow.process::<()>();
3019 let external = flow.external::<()>();
3020
3021 let node_tick = node.tick();
3022 let count = node_tick
3023 .singleton(q!([1, 2, 3]))
3024 .into_stream()
3025 .flatten_ordered()
3026 .first()
3027 .into_stream()
3028 .count()
3029 .all_ticks()
3030 .send_bincode_external(&external);
3031
3032 let nodes = flow
3033 .with_process(&node, deployment.Localhost())
3034 .with_external(&external, deployment.Localhost())
3035 .deploy(&mut deployment);
3036
3037 deployment.deploy().await.unwrap();
3038
3039 let mut external_out = nodes.connect(count).await;
3040
3041 deployment.start().await.unwrap();
3042
3043 assert_eq!(external_out.next().await.unwrap(), 1);
3044 }
3045
3046 #[cfg(feature = "deploy")]
3047 #[tokio::test]
3048 async fn unbounded_reduce_remembers_state() {
3049 let mut deployment = Deployment::new();
3050
3051 let flow = FlowBuilder::new();
3052 let node = flow.process::<()>();
3053 let external = flow.external::<()>();
3054
3055 let (input_port, input) = node.source_external_bincode(&external);
3056 let out = input
3057 .reduce(q!(|acc, v| *acc += v))
3058 .sample_eager(nondet!(/** test */))
3059 .send_bincode_external(&external);
3060
3061 let nodes = flow
3062 .with_process(&node, deployment.Localhost())
3063 .with_external(&external, deployment.Localhost())
3064 .deploy(&mut deployment);
3065
3066 deployment.deploy().await.unwrap();
3067
3068 let mut external_in = nodes.connect(input_port).await;
3069 let mut external_out = nodes.connect(out).await;
3070
3071 deployment.start().await.unwrap();
3072
3073 external_in.send(1).await.unwrap();
3074 assert_eq!(external_out.next().await.unwrap(), 1);
3075
3076 external_in.send(2).await.unwrap();
3077 assert_eq!(external_out.next().await.unwrap(), 3);
3078 }
3079
3080 #[cfg(feature = "deploy")]
3081 #[tokio::test]
3082 async fn atomic_fold_replays_each_tick() {
3083 let mut deployment = Deployment::new();
3084
3085 let flow = FlowBuilder::new();
3086 let node = flow.process::<()>();
3087 let external = flow.external::<()>();
3088
3089 let (input_port, input) =
3090 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3091 let tick = node.tick();
3092
3093 let out = input
3094 .batch(&tick, nondet!(/** test */))
3095 .cross_singleton(
3096 node.source_iter(q!(vec![1, 2, 3]))
3097 .atomic(&tick)
3098 .fold(q!(|| 0), q!(|acc, v| *acc += v))
3099 .snapshot_atomic(nondet!(/** test */)),
3100 )
3101 .all_ticks()
3102 .send_bincode_external(&external);
3103
3104 let nodes = flow
3105 .with_process(&node, deployment.Localhost())
3106 .with_external(&external, deployment.Localhost())
3107 .deploy(&mut deployment);
3108
3109 deployment.deploy().await.unwrap();
3110
3111 let mut external_in = nodes.connect(input_port).await;
3112 let mut external_out = nodes.connect(out).await;
3113
3114 deployment.start().await.unwrap();
3115
3116 external_in.send(1).await.unwrap();
3117 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3118
3119 external_in.send(2).await.unwrap();
3120 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3121 }
3122
3123 #[cfg(feature = "deploy")]
3124 #[tokio::test]
3125 async fn unbounded_scan_remembers_state() {
3126 let mut deployment = Deployment::new();
3127
3128 let flow = FlowBuilder::new();
3129 let node = flow.process::<()>();
3130 let external = flow.external::<()>();
3131
3132 let (input_port, input) = node.source_external_bincode(&external);
3133 let out = input
3134 .scan(
3135 q!(|| 0),
3136 q!(|acc, v| {
3137 *acc += v;
3138 Some(*acc)
3139 }),
3140 )
3141 .send_bincode_external(&external);
3142
3143 let nodes = flow
3144 .with_process(&node, deployment.Localhost())
3145 .with_external(&external, deployment.Localhost())
3146 .deploy(&mut deployment);
3147
3148 deployment.deploy().await.unwrap();
3149
3150 let mut external_in = nodes.connect(input_port).await;
3151 let mut external_out = nodes.connect(out).await;
3152
3153 deployment.start().await.unwrap();
3154
3155 external_in.send(1).await.unwrap();
3156 assert_eq!(external_out.next().await.unwrap(), 1);
3157
3158 external_in.send(2).await.unwrap();
3159 assert_eq!(external_out.next().await.unwrap(), 3);
3160 }
3161
3162 #[cfg(feature = "deploy")]
3163 #[tokio::test]
3164 async fn unbounded_enumerate_remembers_state() {
3165 let mut deployment = Deployment::new();
3166
3167 let flow = FlowBuilder::new();
3168 let node = flow.process::<()>();
3169 let external = flow.external::<()>();
3170
3171 let (input_port, input) = node.source_external_bincode(&external);
3172 let out = input.enumerate().send_bincode_external(&external);
3173
3174 let nodes = flow
3175 .with_process(&node, deployment.Localhost())
3176 .with_external(&external, deployment.Localhost())
3177 .deploy(&mut deployment);
3178
3179 deployment.deploy().await.unwrap();
3180
3181 let mut external_in = nodes.connect(input_port).await;
3182 let mut external_out = nodes.connect(out).await;
3183
3184 deployment.start().await.unwrap();
3185
3186 external_in.send(1).await.unwrap();
3187 assert_eq!(external_out.next().await.unwrap(), (0, 1));
3188
3189 external_in.send(2).await.unwrap();
3190 assert_eq!(external_out.next().await.unwrap(), (1, 2));
3191 }
3192
3193 #[cfg(feature = "deploy")]
3194 #[tokio::test]
3195 async fn unbounded_unique_remembers_state() {
3196 let mut deployment = Deployment::new();
3197
3198 let flow = FlowBuilder::new();
3199 let node = flow.process::<()>();
3200 let external = flow.external::<()>();
3201
3202 let (input_port, input) =
3203 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3204 let out = input.unique().send_bincode_external(&external);
3205
3206 let nodes = flow
3207 .with_process(&node, deployment.Localhost())
3208 .with_external(&external, deployment.Localhost())
3209 .deploy(&mut deployment);
3210
3211 deployment.deploy().await.unwrap();
3212
3213 let mut external_in = nodes.connect(input_port).await;
3214 let mut external_out = nodes.connect(out).await;
3215
3216 deployment.start().await.unwrap();
3217
3218 external_in.send(1).await.unwrap();
3219 assert_eq!(external_out.next().await.unwrap(), 1);
3220
3221 external_in.send(2).await.unwrap();
3222 assert_eq!(external_out.next().await.unwrap(), 2);
3223
3224 external_in.send(1).await.unwrap();
3225 external_in.send(3).await.unwrap();
3226 assert_eq!(external_out.next().await.unwrap(), 3);
3227 }
3228
3229 #[cfg(feature = "sim")]
3230 #[test]
3231 #[should_panic]
3232 fn sim_batch_nondet_size() {
3233 let flow = FlowBuilder::new();
3234 let node = flow.process::<()>();
3235
3236 let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3237
3238 let tick = node.tick();
3239 let out_recv = input
3240 .batch(&tick, nondet!(/** test */))
3241 .count()
3242 .all_ticks()
3243 .sim_output();
3244
3245 flow.sim().exhaustive(async || {
3246 in_send.send(());
3247 in_send.send(());
3248 in_send.send(());
3249
3250 assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3251 });
3252 }
3253
3254 #[cfg(feature = "sim")]
3255 #[test]
3256 fn sim_batch_preserves_order() {
3257 let flow = FlowBuilder::new();
3258 let node = flow.process::<()>();
3259
3260 let (in_send, input) = node.sim_input();
3261
3262 let tick = node.tick();
3263 let out_recv = input
3264 .batch(&tick, nondet!(/** test */))
3265 .all_ticks()
3266 .sim_output();
3267
3268 flow.sim().exhaustive(async || {
3269 in_send.send(1);
3270 in_send.send(2);
3271 in_send.send(3);
3272
3273 out_recv.assert_yields_only([1, 2, 3]).await;
3274 });
3275 }
3276
3277 #[cfg(feature = "sim")]
3278 #[test]
3279 #[should_panic]
3280 fn sim_batch_unordered_shuffles() {
3281 let flow = FlowBuilder::new();
3282 let node = flow.process::<()>();
3283
3284 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3285
3286 let tick = node.tick();
3287 let batch = input.batch(&tick, nondet!(/** test */));
3288 let out_recv = batch
3289 .clone()
3290 .min()
3291 .zip(batch.max())
3292 .all_ticks()
3293 .sim_output();
3294
3295 flow.sim().exhaustive(async || {
3296 in_send.send_many_unordered([1, 2, 3]);
3297
3298 if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3299 panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3300 }
3301 });
3302 }
3303
3304 #[cfg(feature = "sim")]
3305 #[test]
3306 fn sim_batch_unordered_shuffles_count() {
3307 let flow = FlowBuilder::new();
3308 let node = flow.process::<()>();
3309
3310 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3311
3312 let tick = node.tick();
3313 let batch = input.batch(&tick, nondet!(/** test */));
3314 let out_recv = batch.all_ticks().sim_output();
3315
3316 let instance_count = flow.sim().exhaustive(async || {
3317 in_send.send_many_unordered([1, 2, 3, 4]);
3318 out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3319 });
3320
3321 assert_eq!(
3322 instance_count,
3323 75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3324 )
3325 }
3326
3327 #[cfg(feature = "sim")]
3328 #[test]
3329 #[should_panic]
3330 fn sim_observe_order_batched() {
3331 let flow = FlowBuilder::new();
3332 let node = flow.process::<()>();
3333
3334 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3335
3336 let tick = node.tick();
3337 let batch = input.batch(&tick, nondet!(/** test */));
3338 let out_recv = batch
3339 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3340 .all_ticks()
3341 .sim_output();
3342
3343 flow.sim().exhaustive(async || {
3344 in_send.send_many_unordered([1, 2, 3, 4]);
3345 out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3346 });
3347 }
3348
3349 #[cfg(feature = "sim")]
3350 #[test]
3351 fn sim_observe_order_batched_count() {
3352 let flow = FlowBuilder::new();
3353 let node = flow.process::<()>();
3354
3355 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3356
3357 let tick = node.tick();
3358 let batch = input.batch(&tick, nondet!(/** test */));
3359 let out_recv = batch
3360 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3361 .all_ticks()
3362 .sim_output();
3363
3364 let instance_count = flow.sim().exhaustive(async || {
3365 in_send.send_many_unordered([1, 2, 3, 4]);
3366 let _ = out_recv.collect::<Vec<_>>().await;
3367 });
3368
3369 assert_eq!(
3370 instance_count,
3371 192 // 4! * 2^{4 - 1}
3372 )
3373 }
3374
3375 #[cfg(feature = "sim")]
3376 #[test]
3377 fn sim_unordered_count_instance_count() {
3378 let flow = FlowBuilder::new();
3379 let node = flow.process::<()>();
3380
3381 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3382
3383 let tick = node.tick();
3384 let out_recv = input
3385 .count()
3386 .snapshot(&tick, nondet!(/** test */))
3387 .all_ticks()
3388 .sim_output();
3389
3390 let instance_count = flow.sim().exhaustive(async || {
3391 in_send.send_many_unordered([1, 2, 3, 4]);
3392 assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3393 });
3394
3395 assert_eq!(
3396 instance_count,
3397 16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3398 )
3399 }
3400}