hydro_lang/live_collections/stream/mod.rs
1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::KeyedStream;
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::ir::{
19 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
20};
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::forward_handle::{ForwardRef, TickCycle};
24use crate::live_collections::batch_atomic::BatchAtomic;
25#[cfg(stageleft_runtime)]
26use crate::location::dynamic::{DynLocation, LocationId};
27use crate::location::tick::{Atomic, DeferTick, NoAtomic};
28use crate::location::{Location, NoTick, Tick, check_matching_location};
29use crate::nondet::{NonDet, nondet};
30use crate::prelude::ManualProof;
31use crate::properties::{AggFuncAlgebra, ValidCommutativityFor, ValidIdempotenceFor};
32
33pub mod networking;
34
35/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
36#[sealed::sealed]
37pub trait Ordering:
38 MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
39{
40 /// The [`StreamOrder`] corresponding to this type.
41 const ORDERING_KIND: StreamOrder;
42}
43
44/// Marks the stream as being totally ordered, which means that there are
45/// no sources of non-determinism (other than intentional ones) that will
46/// affect the order of elements.
47pub enum TotalOrder {}
48
49#[sealed::sealed]
50impl Ordering for TotalOrder {
51 const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
52}
53
54/// Marks the stream as having no order, which means that the order of
55/// elements may be affected by non-determinism.
56///
57/// This restricts certain operators, such as `fold` and `reduce`, to only
58/// be used with commutative aggregation functions.
59pub enum NoOrder {}
60
61#[sealed::sealed]
62impl Ordering for NoOrder {
63 const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
64}
65
66/// Helper trait for determining the weakest of two orderings.
67#[sealed::sealed]
68pub trait MinOrder<Other: ?Sized> {
69 /// The weaker of the two orderings.
70 type Min: Ordering;
71}
72
73#[sealed::sealed]
74impl MinOrder<NoOrder> for TotalOrder {
75 type Min = NoOrder;
76}
77
78#[sealed::sealed]
79impl MinOrder<TotalOrder> for TotalOrder {
80 type Min = TotalOrder;
81}
82
83#[sealed::sealed]
84impl MinOrder<TotalOrder> for NoOrder {
85 type Min = NoOrder;
86}
87
88#[sealed::sealed]
89impl MinOrder<NoOrder> for NoOrder {
90 type Min = NoOrder;
91}
92
93/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
94#[sealed::sealed]
95pub trait Retries:
96 MinRetries<Self, Min = Self>
97 + MinRetries<ExactlyOnce, Min = Self>
98 + MinRetries<AtLeastOnce, Min = AtLeastOnce>
99{
100 /// The [`StreamRetry`] corresponding to this type.
101 const RETRIES_KIND: StreamRetry;
102}
103
104/// Marks the stream as having deterministic message cardinality, with no
105/// possibility of duplicates.
106pub enum ExactlyOnce {}
107
108#[sealed::sealed]
109impl Retries for ExactlyOnce {
110 const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
111}
112
113/// Marks the stream as having non-deterministic message cardinality, which
114/// means that duplicates may occur, but messages will not be dropped.
115pub enum AtLeastOnce {}
116
117#[sealed::sealed]
118impl Retries for AtLeastOnce {
119 const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
120}
121
122/// Helper trait for determining the weakest of two retry guarantees.
123#[sealed::sealed]
124pub trait MinRetries<Other: ?Sized> {
125 /// The weaker of the two retry guarantees.
126 type Min: Retries;
127}
128
129#[sealed::sealed]
130impl MinRetries<AtLeastOnce> for ExactlyOnce {
131 type Min = AtLeastOnce;
132}
133
134#[sealed::sealed]
135impl MinRetries<ExactlyOnce> for ExactlyOnce {
136 type Min = ExactlyOnce;
137}
138
139#[sealed::sealed]
140impl MinRetries<ExactlyOnce> for AtLeastOnce {
141 type Min = AtLeastOnce;
142}
143
144#[sealed::sealed]
145impl MinRetries<AtLeastOnce> for AtLeastOnce {
146 type Min = AtLeastOnce;
147}
148
149/// Streaming sequence of elements with type `Type`.
150///
151/// This live collection represents a growing sequence of elements, with new elements being
152/// asynchronously appended to the end of the sequence. This can be used to model the arrival
153/// of network input, such as API requests, or streaming ingestion.
154///
155/// By default, all streams have deterministic ordering and each element is materialized exactly
156/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
157/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
158/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
159///
160/// Type Parameters:
161/// - `Type`: the type of elements in the stream
162/// - `Loc`: the location where the stream is being materialized
163/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
164/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
165/// (default is [`TotalOrder`])
166/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
167/// [`AtLeastOnce`] (default is [`ExactlyOnce`])
168pub struct Stream<
169 Type,
170 Loc,
171 Bound: Boundedness = Unbounded,
172 Order: Ordering = TotalOrder,
173 Retry: Retries = ExactlyOnce,
174> {
175 pub(crate) location: Loc,
176 pub(crate) ir_node: RefCell<HydroNode>,
177
178 _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
179}
180
181impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
182 for Stream<T, L, Unbounded, O, R>
183where
184 L: Location<'a>,
185{
186 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
187 let new_meta = stream
188 .location
189 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
190
191 Stream {
192 location: stream.location,
193 ir_node: RefCell::new(HydroNode::Cast {
194 inner: Box::new(stream.ir_node.into_inner()),
195 metadata: new_meta,
196 }),
197 _phantom: PhantomData,
198 }
199 }
200}
201
202impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
203 for Stream<T, L, B, NoOrder, R>
204where
205 L: Location<'a>,
206{
207 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
208 stream.weaken_ordering()
209 }
210}
211
212impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
213 for Stream<T, L, B, O, AtLeastOnce>
214where
215 L: Location<'a>,
216{
217 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
218 stream.weaken_retries()
219 }
220}
221
222impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
223where
224 L: Location<'a>,
225{
226 fn defer_tick(self) -> Self {
227 Stream::defer_tick(self)
228 }
229}
230
231impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
232 for Stream<T, Tick<L>, Bounded, O, R>
233where
234 L: Location<'a>,
235{
236 type Location = Tick<L>;
237
238 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
239 Stream::new(
240 location.clone(),
241 HydroNode::CycleSource {
242 ident,
243 metadata: location.new_node_metadata(Self::collection_kind()),
244 },
245 )
246 }
247}
248
249impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
250 for Stream<T, Tick<L>, Bounded, O, R>
251where
252 L: Location<'a>,
253{
254 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
255 assert_eq!(
256 Location::id(&self.location),
257 expected_location,
258 "locations do not match"
259 );
260 self.location
261 .flow_state()
262 .borrow_mut()
263 .push_root(HydroRoot::CycleSink {
264 ident,
265 input: Box::new(self.ir_node.into_inner()),
266 op_metadata: HydroIrOpMetadata::new(),
267 });
268 }
269}
270
271impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
272 for Stream<T, L, B, O, R>
273where
274 L: Location<'a> + NoTick,
275{
276 type Location = L;
277
278 fn create_source(ident: syn::Ident, location: L) -> Self {
279 Stream::new(
280 location.clone(),
281 HydroNode::CycleSource {
282 ident,
283 metadata: location.new_node_metadata(Self::collection_kind()),
284 },
285 )
286 }
287}
288
289impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
290 for Stream<T, L, B, O, R>
291where
292 L: Location<'a> + NoTick,
293{
294 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
295 assert_eq!(
296 Location::id(&self.location),
297 expected_location,
298 "locations do not match"
299 );
300 self.location
301 .flow_state()
302 .borrow_mut()
303 .push_root(HydroRoot::CycleSink {
304 ident,
305 input: Box::new(self.ir_node.into_inner()),
306 op_metadata: HydroIrOpMetadata::new(),
307 });
308 }
309}
310
311impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
312where
313 T: Clone,
314 L: Location<'a>,
315{
316 fn clone(&self) -> Self {
317 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
318 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
319 *self.ir_node.borrow_mut() = HydroNode::Tee {
320 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
321 metadata: self.location.new_node_metadata(Self::collection_kind()),
322 };
323 }
324
325 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
326 Stream {
327 location: self.location.clone(),
328 ir_node: HydroNode::Tee {
329 inner: TeeNode(inner.0.clone()),
330 metadata: metadata.clone(),
331 }
332 .into(),
333 _phantom: PhantomData,
334 }
335 } else {
336 unreachable!()
337 }
338 }
339}
340
341impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
342where
343 L: Location<'a>,
344{
345 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
346 debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
347 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
348
349 Stream {
350 location,
351 ir_node: RefCell::new(ir_node),
352 _phantom: PhantomData,
353 }
354 }
355
356 /// Returns the [`Location`] where this stream is being materialized.
357 pub fn location(&self) -> &L {
358 &self.location
359 }
360
361 pub(crate) fn collection_kind() -> CollectionKind {
362 CollectionKind::Stream {
363 bound: B::BOUND_KIND,
364 order: O::ORDERING_KIND,
365 retry: R::RETRIES_KIND,
366 element_type: quote_type::<T>().into(),
367 }
368 }
369
370 /// Produces a stream based on invoking `f` on each element.
371 /// If you do not want to modify the stream and instead only want to view
372 /// each item use [`Stream::inspect`] instead.
373 ///
374 /// # Example
375 /// ```rust
376 /// # #[cfg(feature = "deploy")] {
377 /// # use hydro_lang::prelude::*;
378 /// # use futures::StreamExt;
379 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
380 /// let words = process.source_iter(q!(vec!["hello", "world"]));
381 /// words.map(q!(|x| x.to_uppercase()))
382 /// # }, |mut stream| async move {
383 /// # for w in vec!["HELLO", "WORLD"] {
384 /// # assert_eq!(stream.next().await.unwrap(), w);
385 /// # }
386 /// # }));
387 /// # }
388 /// ```
389 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
390 where
391 F: Fn(T) -> U + 'a,
392 {
393 let f = f.splice_fn1_ctx(&self.location).into();
394 Stream::new(
395 self.location.clone(),
396 HydroNode::Map {
397 f,
398 input: Box::new(self.ir_node.into_inner()),
399 metadata: self
400 .location
401 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
402 },
403 )
404 }
405
406 /// For each item `i` in the input stream, transform `i` using `f` and then treat the
407 /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
408 /// for the output type `U` must produce items in a **deterministic** order.
409 ///
410 /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
411 /// not deterministic, use [`Stream::flat_map_unordered`] instead.
412 ///
413 /// # Example
414 /// ```rust
415 /// # #[cfg(feature = "deploy")] {
416 /// # use hydro_lang::prelude::*;
417 /// # use futures::StreamExt;
418 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
419 /// process
420 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
421 /// .flat_map_ordered(q!(|x| x))
422 /// # }, |mut stream| async move {
423 /// // 1, 2, 3, 4
424 /// # for w in (1..5) {
425 /// # assert_eq!(stream.next().await.unwrap(), w);
426 /// # }
427 /// # }));
428 /// # }
429 /// ```
430 pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
431 where
432 I: IntoIterator<Item = U>,
433 F: Fn(T) -> I + 'a,
434 {
435 let f = f.splice_fn1_ctx(&self.location).into();
436 Stream::new(
437 self.location.clone(),
438 HydroNode::FlatMap {
439 f,
440 input: Box::new(self.ir_node.into_inner()),
441 metadata: self
442 .location
443 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
444 },
445 )
446 }
447
448 /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
449 /// for the output type `U` to produce items in any order.
450 ///
451 /// # Example
452 /// ```rust
453 /// # #[cfg(feature = "deploy")] {
454 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
455 /// # use futures::StreamExt;
456 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
457 /// process
458 /// .source_iter(q!(vec![
459 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
460 /// std::collections::HashSet::from_iter(vec![3, 4]),
461 /// ]))
462 /// .flat_map_unordered(q!(|x| x))
463 /// # }, |mut stream| async move {
464 /// // 1, 2, 3, 4, but in no particular order
465 /// # let mut results = Vec::new();
466 /// # for w in (1..5) {
467 /// # results.push(stream.next().await.unwrap());
468 /// # }
469 /// # results.sort();
470 /// # assert_eq!(results, vec![1, 2, 3, 4]);
471 /// # }));
472 /// # }
473 /// ```
474 pub fn flat_map_unordered<U, I, F>(
475 self,
476 f: impl IntoQuotedMut<'a, F, L>,
477 ) -> Stream<U, L, B, NoOrder, R>
478 where
479 I: IntoIterator<Item = U>,
480 F: Fn(T) -> I + 'a,
481 {
482 let f = f.splice_fn1_ctx(&self.location).into();
483 Stream::new(
484 self.location.clone(),
485 HydroNode::FlatMap {
486 f,
487 input: Box::new(self.ir_node.into_inner()),
488 metadata: self
489 .location
490 .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
491 },
492 )
493 }
494
495 /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
496 /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
497 ///
498 /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
499 /// not deterministic, use [`Stream::flatten_unordered`] instead.
500 ///
501 /// ```rust
502 /// # #[cfg(feature = "deploy")] {
503 /// # use hydro_lang::prelude::*;
504 /// # use futures::StreamExt;
505 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
506 /// process
507 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
508 /// .flatten_ordered()
509 /// # }, |mut stream| async move {
510 /// // 1, 2, 3, 4
511 /// # for w in (1..5) {
512 /// # assert_eq!(stream.next().await.unwrap(), w);
513 /// # }
514 /// # }));
515 /// # }
516 /// ```
517 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
518 where
519 T: IntoIterator<Item = U>,
520 {
521 self.flat_map_ordered(q!(|d| d))
522 }
523
524 /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
525 /// for the element type `T` to produce items in any order.
526 ///
527 /// # Example
528 /// ```rust
529 /// # #[cfg(feature = "deploy")] {
530 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
531 /// # use futures::StreamExt;
532 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
533 /// process
534 /// .source_iter(q!(vec![
535 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
536 /// std::collections::HashSet::from_iter(vec![3, 4]),
537 /// ]))
538 /// .flatten_unordered()
539 /// # }, |mut stream| async move {
540 /// // 1, 2, 3, 4, but in no particular order
541 /// # let mut results = Vec::new();
542 /// # for w in (1..5) {
543 /// # results.push(stream.next().await.unwrap());
544 /// # }
545 /// # results.sort();
546 /// # assert_eq!(results, vec![1, 2, 3, 4]);
547 /// # }));
548 /// # }
549 /// ```
550 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
551 where
552 T: IntoIterator<Item = U>,
553 {
554 self.flat_map_unordered(q!(|d| d))
555 }
556
557 /// Creates a stream containing only the elements of the input stream that satisfy a predicate
558 /// `f`, preserving the order of the elements.
559 ///
560 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
561 /// not modify or take ownership of the values. If you need to modify the values while filtering
562 /// use [`Stream::filter_map`] instead.
563 ///
564 /// # Example
565 /// ```rust
566 /// # #[cfg(feature = "deploy")] {
567 /// # use hydro_lang::prelude::*;
568 /// # use futures::StreamExt;
569 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
570 /// process
571 /// .source_iter(q!(vec![1, 2, 3, 4]))
572 /// .filter(q!(|&x| x > 2))
573 /// # }, |mut stream| async move {
574 /// // 3, 4
575 /// # for w in (3..5) {
576 /// # assert_eq!(stream.next().await.unwrap(), w);
577 /// # }
578 /// # }));
579 /// # }
580 /// ```
581 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
582 where
583 F: Fn(&T) -> bool + 'a,
584 {
585 let f = f.splice_fn1_borrow_ctx(&self.location).into();
586 Stream::new(
587 self.location.clone(),
588 HydroNode::Filter {
589 f,
590 input: Box::new(self.ir_node.into_inner()),
591 metadata: self.location.new_node_metadata(Self::collection_kind()),
592 },
593 )
594 }
595
596 /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
597 ///
598 /// # Example
599 /// ```rust
600 /// # #[cfg(feature = "deploy")] {
601 /// # use hydro_lang::prelude::*;
602 /// # use futures::StreamExt;
603 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
604 /// process
605 /// .source_iter(q!(vec!["1", "hello", "world", "2"]))
606 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
607 /// # }, |mut stream| async move {
608 /// // 1, 2
609 /// # for w in (1..3) {
610 /// # assert_eq!(stream.next().await.unwrap(), w);
611 /// # }
612 /// # }));
613 /// # }
614 /// ```
615 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
616 where
617 F: Fn(T) -> Option<U> + 'a,
618 {
619 let f = f.splice_fn1_ctx(&self.location).into();
620 Stream::new(
621 self.location.clone(),
622 HydroNode::FilterMap {
623 f,
624 input: Box::new(self.ir_node.into_inner()),
625 metadata: self
626 .location
627 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
628 },
629 )
630 }
631
632 /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
633 /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
634 /// If `other` is an empty [`Optional`], no values will be produced.
635 ///
636 /// # Example
637 /// ```rust
638 /// # #[cfg(feature = "deploy")] {
639 /// # use hydro_lang::prelude::*;
640 /// # use futures::StreamExt;
641 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
642 /// let tick = process.tick();
643 /// let batch = process
644 /// .source_iter(q!(vec![1, 2, 3, 4]))
645 /// .batch(&tick, nondet!(/** test */));
646 /// let count = batch.clone().count(); // `count()` returns a singleton
647 /// batch.cross_singleton(count).all_ticks()
648 /// # }, |mut stream| async move {
649 /// // (1, 4), (2, 4), (3, 4), (4, 4)
650 /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
651 /// # assert_eq!(stream.next().await.unwrap(), w);
652 /// # }
653 /// # }));
654 /// # }
655 /// ```
656 pub fn cross_singleton<O2>(
657 self,
658 other: impl Into<Optional<O2, L, Bounded>>,
659 ) -> Stream<(T, O2), L, B, O, R>
660 where
661 O2: Clone,
662 {
663 let other: Optional<O2, L, Bounded> = other.into();
664 check_matching_location(&self.location, &other.location);
665
666 Stream::new(
667 self.location.clone(),
668 HydroNode::CrossSingleton {
669 left: Box::new(self.ir_node.into_inner()),
670 right: Box::new(other.ir_node.into_inner()),
671 metadata: self
672 .location
673 .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
674 },
675 )
676 }
677
678 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
679 ///
680 /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
681 /// leader of a cluster.
682 ///
683 /// # Example
684 /// ```rust
685 /// # #[cfg(feature = "deploy")] {
686 /// # use hydro_lang::prelude::*;
687 /// # use futures::StreamExt;
688 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
689 /// let tick = process.tick();
690 /// // ticks are lazy by default, forces the second tick to run
691 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
692 ///
693 /// let batch_first_tick = process
694 /// .source_iter(q!(vec![1, 2, 3, 4]))
695 /// .batch(&tick, nondet!(/** test */));
696 /// let batch_second_tick = process
697 /// .source_iter(q!(vec![5, 6, 7, 8]))
698 /// .batch(&tick, nondet!(/** test */))
699 /// .defer_tick(); // appears on the second tick
700 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
701 /// batch_first_tick.chain(batch_second_tick)
702 /// .filter_if_some(some_on_first_tick)
703 /// .all_ticks()
704 /// # }, |mut stream| async move {
705 /// // [1, 2, 3, 4]
706 /// # for w in vec![1, 2, 3, 4] {
707 /// # assert_eq!(stream.next().await.unwrap(), w);
708 /// # }
709 /// # }));
710 /// # }
711 /// ```
712 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
713 self.cross_singleton(signal.map(q!(|_u| ())))
714 .map(q!(|(d, _signal)| d))
715 }
716
717 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
718 ///
719 /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
720 /// some local state.
721 ///
722 /// # Example
723 /// ```rust
724 /// # #[cfg(feature = "deploy")] {
725 /// # use hydro_lang::prelude::*;
726 /// # use futures::StreamExt;
727 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
728 /// let tick = process.tick();
729 /// // ticks are lazy by default, forces the second tick to run
730 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
731 ///
732 /// let batch_first_tick = process
733 /// .source_iter(q!(vec![1, 2, 3, 4]))
734 /// .batch(&tick, nondet!(/** test */));
735 /// let batch_second_tick = process
736 /// .source_iter(q!(vec![5, 6, 7, 8]))
737 /// .batch(&tick, nondet!(/** test */))
738 /// .defer_tick(); // appears on the second tick
739 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
740 /// batch_first_tick.chain(batch_second_tick)
741 /// .filter_if_none(some_on_first_tick)
742 /// .all_ticks()
743 /// # }, |mut stream| async move {
744 /// // [5, 6, 7, 8]
745 /// # for w in vec![5, 6, 7, 8] {
746 /// # assert_eq!(stream.next().await.unwrap(), w);
747 /// # }
748 /// # }));
749 /// # }
750 /// ```
751 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
752 self.filter_if_some(
753 other
754 .map(q!(|_| ()))
755 .into_singleton()
756 .filter(q!(|o| o.is_none())),
757 )
758 }
759
760 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
761 /// tupled pairs in a non-deterministic order.
762 ///
763 /// # Example
764 /// ```rust
765 /// # #[cfg(feature = "deploy")] {
766 /// # use hydro_lang::prelude::*;
767 /// # use std::collections::HashSet;
768 /// # use futures::StreamExt;
769 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
770 /// let tick = process.tick();
771 /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
772 /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
773 /// stream1.cross_product(stream2)
774 /// # }, |mut stream| async move {
775 /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
776 /// # stream.map(|i| assert!(expected.contains(&i)));
777 /// # }));
778 /// # }
779 /// ```
780 pub fn cross_product<T2, O2: Ordering>(
781 self,
782 other: Stream<T2, L, B, O2, R>,
783 ) -> Stream<(T, T2), L, B, NoOrder, R>
784 where
785 T: Clone,
786 T2: Clone,
787 {
788 check_matching_location(&self.location, &other.location);
789
790 Stream::new(
791 self.location.clone(),
792 HydroNode::CrossProduct {
793 left: Box::new(self.ir_node.into_inner()),
794 right: Box::new(other.ir_node.into_inner()),
795 metadata: self
796 .location
797 .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
798 },
799 )
800 }
801
802 /// Takes one stream as input and filters out any duplicate occurrences. The output
803 /// contains all unique values from the input.
804 ///
805 /// # Example
806 /// ```rust
807 /// # #[cfg(feature = "deploy")] {
808 /// # use hydro_lang::prelude::*;
809 /// # use futures::StreamExt;
810 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
811 /// let tick = process.tick();
812 /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
813 /// # }, |mut stream| async move {
814 /// # for w in vec![1, 2, 3, 4] {
815 /// # assert_eq!(stream.next().await.unwrap(), w);
816 /// # }
817 /// # }));
818 /// # }
819 /// ```
820 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
821 where
822 T: Eq + Hash,
823 {
824 Stream::new(
825 self.location.clone(),
826 HydroNode::Unique {
827 input: Box::new(self.ir_node.into_inner()),
828 metadata: self
829 .location
830 .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
831 },
832 )
833 }
834
835 /// Outputs everything in this stream that is *not* contained in the `other` stream.
836 ///
837 /// The `other` stream must be [`Bounded`], since this function will wait until
838 /// all its elements are available before producing any output.
839 /// # Example
840 /// ```rust
841 /// # #[cfg(feature = "deploy")] {
842 /// # use hydro_lang::prelude::*;
843 /// # use futures::StreamExt;
844 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
845 /// let tick = process.tick();
846 /// let stream = process
847 /// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
848 /// .batch(&tick, nondet!(/** test */));
849 /// let batch = process
850 /// .source_iter(q!(vec![1, 2]))
851 /// .batch(&tick, nondet!(/** test */));
852 /// stream.filter_not_in(batch).all_ticks()
853 /// # }, |mut stream| async move {
854 /// # for w in vec![3, 4] {
855 /// # assert_eq!(stream.next().await.unwrap(), w);
856 /// # }
857 /// # }));
858 /// # }
859 /// ```
860 pub fn filter_not_in<O2: Ordering>(self, other: Stream<T, L, Bounded, O2, R>) -> Self
861 where
862 T: Eq + Hash,
863 {
864 check_matching_location(&self.location, &other.location);
865
866 Stream::new(
867 self.location.clone(),
868 HydroNode::Difference {
869 pos: Box::new(self.ir_node.into_inner()),
870 neg: Box::new(other.ir_node.into_inner()),
871 metadata: self
872 .location
873 .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
874 },
875 )
876 }
877
878 /// An operator which allows you to "inspect" each element of a stream without
879 /// modifying it. The closure `f` is called on a reference to each item. This is
880 /// mainly useful for debugging, and should not be used to generate side-effects.
881 ///
882 /// # Example
883 /// ```rust
884 /// # #[cfg(feature = "deploy")] {
885 /// # use hydro_lang::prelude::*;
886 /// # use futures::StreamExt;
887 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
888 /// let nums = process.source_iter(q!(vec![1, 2]));
889 /// // prints "1 * 10 = 10" and "2 * 10 = 20"
890 /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
891 /// # }, |mut stream| async move {
892 /// # for w in vec![1, 2] {
893 /// # assert_eq!(stream.next().await.unwrap(), w);
894 /// # }
895 /// # }));
896 /// # }
897 /// ```
898 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
899 where
900 F: Fn(&T) + 'a,
901 {
902 let f = f.splice_fn1_borrow_ctx(&self.location).into();
903
904 Stream::new(
905 self.location.clone(),
906 HydroNode::Inspect {
907 f,
908 input: Box::new(self.ir_node.into_inner()),
909 metadata: self.location.new_node_metadata(Self::collection_kind()),
910 },
911 )
912 }
913
914 /// An operator which allows you to "name" a `HydroNode`.
915 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
916 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
917 {
918 let mut node = self.ir_node.borrow_mut();
919 let metadata = node.metadata_mut();
920 metadata.tag = Some(name.to_string());
921 }
922 self
923 }
924
925 /// Explicitly "casts" the stream to a type with a different ordering
926 /// guarantee. Useful in unsafe code where the ordering cannot be proven
927 /// by the type-system.
928 ///
929 /// # Non-Determinism
930 /// This function is used as an escape hatch, and any mistakes in the
931 /// provided ordering guarantee will propagate into the guarantees
932 /// for the rest of the program.
933 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
934 if O::ORDERING_KIND == O2::ORDERING_KIND {
935 Stream::new(self.location, self.ir_node.into_inner())
936 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
937 // We can always weaken the ordering guarantee
938 Stream::new(
939 self.location.clone(),
940 HydroNode::Cast {
941 inner: Box::new(self.ir_node.into_inner()),
942 metadata: self
943 .location
944 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
945 },
946 )
947 } else {
948 Stream::new(
949 self.location.clone(),
950 HydroNode::ObserveNonDet {
951 inner: Box::new(self.ir_node.into_inner()),
952 trusted: false,
953 metadata: self
954 .location
955 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
956 },
957 )
958 }
959 }
960
961 // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
962 // intermediate states will not be revealed
963 fn assume_ordering_trusted_bounded<O2: Ordering>(
964 self,
965 nondet: NonDet,
966 ) -> Stream<T, L, B, O2, R> {
967 if B::BOUNDED {
968 self.assume_ordering_trusted(nondet)
969 } else {
970 self.assume_ordering(nondet)
971 }
972 }
973
974 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
975 // is not observable
976 pub(crate) fn assume_ordering_trusted<O2: Ordering>(
977 self,
978 _nondet: NonDet,
979 ) -> Stream<T, L, B, O2, R> {
980 if O::ORDERING_KIND == O2::ORDERING_KIND {
981 Stream::new(self.location, self.ir_node.into_inner())
982 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
983 // We can always weaken the ordering guarantee
984 Stream::new(
985 self.location.clone(),
986 HydroNode::Cast {
987 inner: Box::new(self.ir_node.into_inner()),
988 metadata: self
989 .location
990 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
991 },
992 )
993 } else {
994 Stream::new(
995 self.location.clone(),
996 HydroNode::ObserveNonDet {
997 inner: Box::new(self.ir_node.into_inner()),
998 trusted: true,
999 metadata: self
1000 .location
1001 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1002 },
1003 )
1004 }
1005 }
1006
1007 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
1008 /// which is always safe because that is the weakest possible guarantee.
1009 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
1010 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
1011 self.assume_ordering::<NoOrder>(nondet)
1012 }
1013
1014 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
1015 /// enforcing that `O2` is weaker than the input ordering guarantee.
1016 pub fn weaken_ordering<O2: Ordering + MinOrder<O, Min = O2>>(self) -> Stream<T, L, B, O2, R> {
1017 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
1018 self.assume_ordering::<O2>(nondet)
1019 }
1020
1021 /// Explicitly "casts" the stream to a type with a different retries
1022 /// guarantee. Useful in unsafe code where the lack of retries cannot
1023 /// be proven by the type-system.
1024 ///
1025 /// # Non-Determinism
1026 /// This function is used as an escape hatch, and any mistakes in the
1027 /// provided retries guarantee will propagate into the guarantees
1028 /// for the rest of the program.
1029 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1030 if R::RETRIES_KIND == R2::RETRIES_KIND {
1031 Stream::new(self.location, self.ir_node.into_inner())
1032 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1033 // We can always weaken the retries guarantee
1034 Stream::new(
1035 self.location.clone(),
1036 HydroNode::Cast {
1037 inner: Box::new(self.ir_node.into_inner()),
1038 metadata: self
1039 .location
1040 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1041 },
1042 )
1043 } else {
1044 Stream::new(
1045 self.location.clone(),
1046 HydroNode::ObserveNonDet {
1047 inner: Box::new(self.ir_node.into_inner()),
1048 trusted: false,
1049 metadata: self
1050 .location
1051 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1052 },
1053 )
1054 }
1055 }
1056
1057 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1058 // is not observable
1059 fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1060 if R::RETRIES_KIND == R2::RETRIES_KIND {
1061 Stream::new(self.location, self.ir_node.into_inner())
1062 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1063 // We can always weaken the retries guarantee
1064 Stream::new(
1065 self.location.clone(),
1066 HydroNode::Cast {
1067 inner: Box::new(self.ir_node.into_inner()),
1068 metadata: self
1069 .location
1070 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1071 },
1072 )
1073 } else {
1074 Stream::new(
1075 self.location.clone(),
1076 HydroNode::ObserveNonDet {
1077 inner: Box::new(self.ir_node.into_inner()),
1078 trusted: true,
1079 metadata: self
1080 .location
1081 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1082 },
1083 )
1084 }
1085 }
1086
1087 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
1088 /// which is always safe because that is the weakest possible guarantee.
1089 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
1090 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1091 self.assume_retries::<AtLeastOnce>(nondet)
1092 }
1093
1094 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
1095 /// enforcing that `R2` is weaker than the input retries guarantee.
1096 pub fn weaken_retries<R2: Retries + MinRetries<R, Min = R2>>(self) -> Stream<T, L, B, O, R2> {
1097 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1098 self.assume_retries::<R2>(nondet)
1099 }
1100}
1101
1102impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1103where
1104 L: Location<'a>,
1105{
1106 /// Given a stream with [`ExactlyOnce`] retry guarantees, weakens it to an arbitrary guarantee
1107 /// `R2`, which is safe because all guarantees are equal to or weaker than [`ExactlyOnce`]
1108 pub fn weaker_retries<R2: Retries>(self) -> Stream<T, L, B, O, R2> {
1109 self.assume_retries(
1110 nondet!(/** any retry ordering is the same or weaker than ExactlyOnce */),
1111 )
1112 }
1113}
1114
1115impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
1116where
1117 L: Location<'a>,
1118{
1119 /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
1120 ///
1121 /// # Example
1122 /// ```rust
1123 /// # #[cfg(feature = "deploy")] {
1124 /// # use hydro_lang::prelude::*;
1125 /// # use futures::StreamExt;
1126 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1127 /// process.source_iter(q!(&[1, 2, 3])).cloned()
1128 /// # }, |mut stream| async move {
1129 /// // 1, 2, 3
1130 /// # for w in vec![1, 2, 3] {
1131 /// # assert_eq!(stream.next().await.unwrap(), w);
1132 /// # }
1133 /// # }));
1134 /// # }
1135 /// ```
1136 pub fn cloned(self) -> Stream<T, L, B, O, R>
1137 where
1138 T: Clone,
1139 {
1140 self.map(q!(|d| d.clone()))
1141 }
1142}
1143
1144impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
1145where
1146 L: Location<'a>,
1147{
1148 /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1149 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1150 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1151 ///
1152 /// Depending on the input stream guarantees, the closure may need to be commutative
1153 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1154 ///
1155 /// # Example
1156 /// ```rust
1157 /// # #[cfg(feature = "deploy")] {
1158 /// # use hydro_lang::prelude::*;
1159 /// # use futures::StreamExt;
1160 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1161 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1162 /// words
1163 /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1164 /// .into_stream()
1165 /// # }, |mut stream| async move {
1166 /// // "HELLOWORLD"
1167 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1168 /// # }));
1169 /// # }
1170 /// ```
1171 pub fn fold<A, I, F, C, Idemp>(
1172 self,
1173 init: impl IntoQuotedMut<'a, I, L>,
1174 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1175 ) -> Singleton<A, L, B>
1176 where
1177 I: Fn() -> A + 'a,
1178 F: Fn(&mut A, T),
1179 C: ValidCommutativityFor<O>,
1180 Idemp: ValidIdempotenceFor<R>,
1181 {
1182 let init = init.splice_fn0_ctx(&self.location).into();
1183 let comb = comb
1184 .splice_fn2_borrow_mut_ctx_props(&self.location)
1185 .0
1186 .into();
1187
1188 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1189 let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1190
1191 let core = HydroNode::Fold {
1192 init,
1193 acc: comb,
1194 input: Box::new(ordered_etc.ir_node.into_inner()),
1195 metadata: ordered_etc
1196 .location
1197 .new_node_metadata(Singleton::<A, L, B>::collection_kind()),
1198 };
1199
1200 Singleton::new(ordered_etc.location, core)
1201 }
1202
1203 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1204 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1205 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1206 /// reference, so that it can be modified in place.
1207 ///
1208 /// Depending on the input stream guarantees, the closure may need to be commutative
1209 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1210 ///
1211 /// # Example
1212 /// ```rust
1213 /// # #[cfg(feature = "deploy")] {
1214 /// # use hydro_lang::prelude::*;
1215 /// # use futures::StreamExt;
1216 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1217 /// let bools = process.source_iter(q!(vec![false, true, false]));
1218 /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1219 /// # }, |mut stream| async move {
1220 /// // true
1221 /// # assert_eq!(stream.next().await.unwrap(), true);
1222 /// # }));
1223 /// # }
1224 /// ```
1225 pub fn reduce<F, C, Idemp>(
1226 self,
1227 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1228 ) -> Optional<T, L, B>
1229 where
1230 F: Fn(&mut T, T) + 'a,
1231 C: ValidCommutativityFor<O>,
1232 Idemp: ValidIdempotenceFor<R>,
1233 {
1234 let f = comb
1235 .splice_fn2_borrow_mut_ctx_props(&self.location)
1236 .0
1237 .into();
1238
1239 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1240 let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1241
1242 let core = HydroNode::Reduce {
1243 f,
1244 input: Box::new(ordered_etc.ir_node.into_inner()),
1245 metadata: ordered_etc
1246 .location
1247 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1248 };
1249
1250 Optional::new(ordered_etc.location, core)
1251 }
1252
1253 /// Computes the maximum element in the stream as an [`Optional`], which
1254 /// will be empty until the first element in the input arrives.
1255 ///
1256 /// # Example
1257 /// ```rust
1258 /// # #[cfg(feature = "deploy")] {
1259 /// # use hydro_lang::prelude::*;
1260 /// # use futures::StreamExt;
1261 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1262 /// let tick = process.tick();
1263 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1264 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1265 /// batch.max().all_ticks()
1266 /// # }, |mut stream| async move {
1267 /// // 4
1268 /// # assert_eq!(stream.next().await.unwrap(), 4);
1269 /// # }));
1270 /// # }
1271 /// ```
1272 pub fn max(self) -> Optional<T, L, B>
1273 where
1274 T: Ord,
1275 {
1276 self.assume_retries_trusted(nondet!(/** max is idempotent */))
1277 .assume_ordering_trusted_bounded(
1278 nondet!(/** max is commutative, but order affects intermediates */),
1279 )
1280 .reduce(q!(|curr, new| {
1281 if new > *curr {
1282 *curr = new;
1283 }
1284 }))
1285 }
1286
1287 /// Computes the minimum element in the stream as an [`Optional`], which
1288 /// will be empty until the first element in the input arrives.
1289 ///
1290 /// # Example
1291 /// ```rust
1292 /// # #[cfg(feature = "deploy")] {
1293 /// # use hydro_lang::prelude::*;
1294 /// # use futures::StreamExt;
1295 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1296 /// let tick = process.tick();
1297 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1298 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1299 /// batch.min().all_ticks()
1300 /// # }, |mut stream| async move {
1301 /// // 1
1302 /// # assert_eq!(stream.next().await.unwrap(), 1);
1303 /// # }));
1304 /// # }
1305 /// ```
1306 pub fn min(self) -> Optional<T, L, B>
1307 where
1308 T: Ord,
1309 {
1310 self.assume_retries_trusted(nondet!(/** min is idempotent */))
1311 .assume_ordering_trusted_bounded(
1312 nondet!(/** max is commutative, but order affects intermediates */),
1313 )
1314 .reduce(q!(|curr, new| {
1315 if new < *curr {
1316 *curr = new;
1317 }
1318 }))
1319 }
1320}
1321
1322impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1323where
1324 L: Location<'a>,
1325{
1326 /// Computes the number of elements in the stream as a [`Singleton`].
1327 ///
1328 /// # Example
1329 /// ```rust
1330 /// # #[cfg(feature = "deploy")] {
1331 /// # use hydro_lang::prelude::*;
1332 /// # use futures::StreamExt;
1333 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1334 /// let tick = process.tick();
1335 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1336 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1337 /// batch.count().all_ticks()
1338 /// # }, |mut stream| async move {
1339 /// // 4
1340 /// # assert_eq!(stream.next().await.unwrap(), 4);
1341 /// # }));
1342 /// # }
1343 /// ```
1344 pub fn count(self) -> Singleton<usize, L, B> {
1345 self.assume_ordering_trusted(nondet!(
1346 /// Order does not affect eventual count, and also does not affect intermediate states.
1347 ))
1348 .fold(q!(|| 0usize), q!(|count, _| *count += 1))
1349 }
1350}
1351
1352impl<'a, T, L, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R>
1353where
1354 L: Location<'a>,
1355{
1356 /// Computes the first element in the stream as an [`Optional`], which
1357 /// will be empty until the first element in the input arrives.
1358 ///
1359 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1360 /// re-ordering of elements may cause the first element to change.
1361 ///
1362 /// # Example
1363 /// ```rust
1364 /// # #[cfg(feature = "deploy")] {
1365 /// # use hydro_lang::prelude::*;
1366 /// # use futures::StreamExt;
1367 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1368 /// let tick = process.tick();
1369 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1370 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1371 /// batch.first().all_ticks()
1372 /// # }, |mut stream| async move {
1373 /// // 1
1374 /// # assert_eq!(stream.next().await.unwrap(), 1);
1375 /// # }));
1376 /// # }
1377 /// ```
1378 pub fn first(self) -> Optional<T, L, B> {
1379 self.assume_retries_trusted(nondet!(/** first is idempotent */))
1380 .reduce(q!(|_, _| {}))
1381 }
1382
1383 /// Computes the last element in the stream as an [`Optional`], which
1384 /// will be empty until an element in the input arrives.
1385 ///
1386 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1387 /// re-ordering of elements may cause the last element to change.
1388 ///
1389 /// # Example
1390 /// ```rust
1391 /// # #[cfg(feature = "deploy")] {
1392 /// # use hydro_lang::prelude::*;
1393 /// # use futures::StreamExt;
1394 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1395 /// let tick = process.tick();
1396 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1397 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1398 /// batch.last().all_ticks()
1399 /// # }, |mut stream| async move {
1400 /// // 4
1401 /// # assert_eq!(stream.next().await.unwrap(), 4);
1402 /// # }));
1403 /// # }
1404 /// ```
1405 pub fn last(self) -> Optional<T, L, B> {
1406 self.assume_retries_trusted(nondet!(/** last is idempotent */))
1407 .reduce(q!(|curr, new| *curr = new))
1408 }
1409}
1410
1411impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
1412where
1413 L: Location<'a>,
1414{
1415 /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1416 ///
1417 /// # Example
1418 /// ```rust
1419 /// # #[cfg(feature = "deploy")] {
1420 /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1421 /// # use futures::StreamExt;
1422 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1423 /// let tick = process.tick();
1424 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1425 /// numbers.enumerate()
1426 /// # }, |mut stream| async move {
1427 /// // (0, 1), (1, 2), (2, 3), (3, 4)
1428 /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1429 /// # assert_eq!(stream.next().await.unwrap(), w);
1430 /// # }
1431 /// # }));
1432 /// # }
1433 /// ```
1434 pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder, ExactlyOnce> {
1435 Stream::new(
1436 self.location.clone(),
1437 HydroNode::Enumerate {
1438 input: Box::new(self.ir_node.into_inner()),
1439 metadata: self.location.new_node_metadata(Stream::<
1440 (usize, T),
1441 L,
1442 B,
1443 TotalOrder,
1444 ExactlyOnce,
1445 >::collection_kind()),
1446 },
1447 )
1448 }
1449
1450 /// Collects all the elements of this stream into a single [`Vec`] element.
1451 ///
1452 /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1453 /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1454 /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1455 /// the vector at an arbitrary point in time.
1456 ///
1457 /// # Example
1458 /// ```rust
1459 /// # #[cfg(feature = "deploy")] {
1460 /// # use hydro_lang::prelude::*;
1461 /// # use futures::StreamExt;
1462 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1463 /// let tick = process.tick();
1464 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1465 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1466 /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1467 /// # }, |mut stream| async move {
1468 /// // [ vec![1, 2, 3, 4] ]
1469 /// # for w in vec![vec![1, 2, 3, 4]] {
1470 /// # assert_eq!(stream.next().await.unwrap(), w);
1471 /// # }
1472 /// # }));
1473 /// # }
1474 /// ```
1475 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B> {
1476 self.fold(
1477 q!(|| vec![]),
1478 q!(|acc, v| {
1479 acc.push(v);
1480 }),
1481 )
1482 }
1483
1484 /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1485 /// and emitting each intermediate result.
1486 ///
1487 /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1488 /// containing all intermediate accumulated values. The scan operation can also terminate early
1489 /// by returning `None`.
1490 ///
1491 /// The function takes a mutable reference to the accumulator and the current element, and returns
1492 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1493 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1494 ///
1495 /// # Examples
1496 ///
1497 /// Basic usage - running sum:
1498 /// ```rust
1499 /// # #[cfg(feature = "deploy")] {
1500 /// # use hydro_lang::prelude::*;
1501 /// # use futures::StreamExt;
1502 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1503 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1504 /// q!(|| 0),
1505 /// q!(|acc, x| {
1506 /// *acc += x;
1507 /// Some(*acc)
1508 /// }),
1509 /// )
1510 /// # }, |mut stream| async move {
1511 /// // Output: 1, 3, 6, 10
1512 /// # for w in vec![1, 3, 6, 10] {
1513 /// # assert_eq!(stream.next().await.unwrap(), w);
1514 /// # }
1515 /// # }));
1516 /// # }
1517 /// ```
1518 ///
1519 /// Early termination example:
1520 /// ```rust
1521 /// # #[cfg(feature = "deploy")] {
1522 /// # use hydro_lang::prelude::*;
1523 /// # use futures::StreamExt;
1524 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1525 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1526 /// q!(|| 1),
1527 /// q!(|state, x| {
1528 /// *state = *state * x;
1529 /// if *state > 6 {
1530 /// None // Terminate the stream
1531 /// } else {
1532 /// Some(-*state)
1533 /// }
1534 /// }),
1535 /// )
1536 /// # }, |mut stream| async move {
1537 /// // Output: -1, -2, -6
1538 /// # for w in vec![-1, -2, -6] {
1539 /// # assert_eq!(stream.next().await.unwrap(), w);
1540 /// # }
1541 /// # }));
1542 /// # }
1543 /// ```
1544 pub fn scan<A, U, I, F>(
1545 self,
1546 init: impl IntoQuotedMut<'a, I, L>,
1547 f: impl IntoQuotedMut<'a, F, L>,
1548 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1549 where
1550 I: Fn() -> A + 'a,
1551 F: Fn(&mut A, T) -> Option<U> + 'a,
1552 {
1553 let init = init.splice_fn0_ctx(&self.location).into();
1554 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1555
1556 Stream::new(
1557 self.location.clone(),
1558 HydroNode::Scan {
1559 init,
1560 acc: f,
1561 input: Box::new(self.ir_node.into_inner()),
1562 metadata: self.location.new_node_metadata(
1563 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1564 ),
1565 },
1566 )
1567 }
1568}
1569
1570impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
1571 /// Produces a new stream that interleaves the elements of the two input streams.
1572 /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1573 ///
1574 /// Currently, both input streams must be [`Unbounded`]. When the streams are
1575 /// [`Bounded`], you can use [`Stream::chain`] instead.
1576 ///
1577 /// # Example
1578 /// ```rust
1579 /// # #[cfg(feature = "deploy")] {
1580 /// # use hydro_lang::prelude::*;
1581 /// # use futures::StreamExt;
1582 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1583 /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
1584 /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
1585 /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1586 /// # }, |mut stream| async move {
1587 /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1588 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1589 /// # assert_eq!(stream.next().await.unwrap(), w);
1590 /// # }
1591 /// # }));
1592 /// # }
1593 /// ```
1594 pub fn interleave<O2: Ordering, R2: Retries>(
1595 self,
1596 other: Stream<T, L, Unbounded, O2, R2>,
1597 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1598 where
1599 R: MinRetries<R2>,
1600 {
1601 Stream::new(
1602 self.location.clone(),
1603 HydroNode::Chain {
1604 first: Box::new(self.ir_node.into_inner()),
1605 second: Box::new(other.ir_node.into_inner()),
1606 metadata: self.location.new_node_metadata(Stream::<
1607 T,
1608 L,
1609 Unbounded,
1610 NoOrder,
1611 <R as MinRetries<R2>>::Min,
1612 >::collection_kind()),
1613 },
1614 )
1615 }
1616}
1617
1618impl<'a, T, L: Location<'a> + NoTick, R: Retries> Stream<T, L, Unbounded, TotalOrder, R> {
1619 /// Produces a new stream that combines the elements of the two input streams,
1620 /// preserving the relative order of elements within each input.
1621 ///
1622 /// Currently, both input streams must be [`Unbounded`]. When the streams are
1623 /// [`Bounded`], you can use [`Stream::chain`] instead.
1624 ///
1625 /// # Non-Determinism
1626 /// The order in which elements *across* the two streams will be interleaved is
1627 /// non-deterministic, so the order of elements will vary across runs. If the output order
1628 /// is irrelevant, use [`Stream::interleave`] instead, which is deterministic but emits an
1629 /// unordered stream.
1630 ///
1631 /// # Example
1632 /// ```rust
1633 /// # #[cfg(feature = "deploy")] {
1634 /// # use hydro_lang::prelude::*;
1635 /// # use futures::StreamExt;
1636 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1637 /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
1638 /// # process.source_iter(q!(vec![1, 3])).into();
1639 /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
1640 /// # }, |mut stream| async move {
1641 /// // 1, 3 and 2, 4 in some order, preserving the original local order
1642 /// # for w in vec![1, 3, 2, 4] {
1643 /// # assert_eq!(stream.next().await.unwrap(), w);
1644 /// # }
1645 /// # }));
1646 /// # }
1647 /// ```
1648 pub fn merge_ordered<R2: Retries>(
1649 self,
1650 other: Stream<T, L, Unbounded, TotalOrder, R2>,
1651 _nondet: NonDet,
1652 ) -> Stream<T, L, Unbounded, TotalOrder, <R as MinRetries<R2>>::Min>
1653 where
1654 R: MinRetries<R2>,
1655 {
1656 Stream::new(
1657 self.location.clone(),
1658 HydroNode::Chain {
1659 first: Box::new(self.ir_node.into_inner()),
1660 second: Box::new(other.ir_node.into_inner()),
1661 metadata: self.location.new_node_metadata(Stream::<
1662 T,
1663 L,
1664 Unbounded,
1665 TotalOrder,
1666 <R as MinRetries<R2>>::Min,
1667 >::collection_kind()),
1668 },
1669 )
1670 }
1671}
1672
1673impl<'a, T, L, O: Ordering, R: Retries> Stream<T, L, Bounded, O, R>
1674where
1675 L: Location<'a>,
1676{
1677 /// Produces a new stream that emits the input elements in sorted order.
1678 ///
1679 /// The input stream can have any ordering guarantee, but the output stream
1680 /// will have a [`TotalOrder`] guarantee. This operator will block until all
1681 /// elements in the input stream are available, so it requires the input stream
1682 /// to be [`Bounded`].
1683 ///
1684 /// # Example
1685 /// ```rust
1686 /// # #[cfg(feature = "deploy")] {
1687 /// # use hydro_lang::prelude::*;
1688 /// # use futures::StreamExt;
1689 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1690 /// let tick = process.tick();
1691 /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1692 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1693 /// batch.sort().all_ticks()
1694 /// # }, |mut stream| async move {
1695 /// // 1, 2, 3, 4
1696 /// # for w in (1..5) {
1697 /// # assert_eq!(stream.next().await.unwrap(), w);
1698 /// # }
1699 /// # }));
1700 /// # }
1701 /// ```
1702 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1703 where
1704 T: Ord,
1705 {
1706 Stream::new(
1707 self.location.clone(),
1708 HydroNode::Sort {
1709 input: Box::new(self.ir_node.into_inner()),
1710 metadata: self
1711 .location
1712 .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
1713 },
1714 )
1715 }
1716
1717 /// Produces a new stream that first emits the elements of the `self` stream,
1718 /// and then emits the elements of the `other` stream. The output stream has
1719 /// a [`TotalOrder`] guarantee if and only if both input streams have a
1720 /// [`TotalOrder`] guarantee.
1721 ///
1722 /// Currently, both input streams must be [`Bounded`]. This operator will block
1723 /// on the first stream until all its elements are available. In a future version,
1724 /// we will relax the requirement on the `other` stream.
1725 ///
1726 /// # Example
1727 /// ```rust
1728 /// # #[cfg(feature = "deploy")] {
1729 /// # use hydro_lang::prelude::*;
1730 /// # use futures::StreamExt;
1731 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1732 /// let tick = process.tick();
1733 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1734 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1735 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1736 /// # }, |mut stream| async move {
1737 /// // 2, 3, 4, 5, 1, 2, 3, 4
1738 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1739 /// # assert_eq!(stream.next().await.unwrap(), w);
1740 /// # }
1741 /// # }));
1742 /// # }
1743 /// ```
1744 pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
1745 self,
1746 other: Stream<T, L, B2, O2, R2>,
1747 ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1748 where
1749 O: MinOrder<O2>,
1750 R: MinRetries<R2>,
1751 {
1752 check_matching_location(&self.location, &other.location);
1753
1754 Stream::new(
1755 self.location.clone(),
1756 HydroNode::Chain {
1757 first: Box::new(self.ir_node.into_inner()),
1758 second: Box::new(other.ir_node.into_inner()),
1759 metadata: self.location.new_node_metadata(Stream::<
1760 T,
1761 L,
1762 B2,
1763 <O as MinOrder<O2>>::Min,
1764 <R as MinRetries<R2>>::Min,
1765 >::collection_kind()),
1766 },
1767 )
1768 }
1769
1770 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
1771 /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
1772 /// because this is compiled into a nested loop.
1773 pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
1774 self,
1775 other: Stream<T2, L, Bounded, O2, R>,
1776 ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
1777 where
1778 T: Clone,
1779 T2: Clone,
1780 {
1781 check_matching_location(&self.location, &other.location);
1782
1783 Stream::new(
1784 self.location.clone(),
1785 HydroNode::CrossProduct {
1786 left: Box::new(self.ir_node.into_inner()),
1787 right: Box::new(other.ir_node.into_inner()),
1788 metadata: self.location.new_node_metadata(Stream::<
1789 (T, T2),
1790 L,
1791 Bounded,
1792 <O2 as MinOrder<O>>::Min,
1793 R,
1794 >::collection_kind()),
1795 },
1796 )
1797 }
1798
1799 /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
1800 /// `self` used as the values for *each* key.
1801 ///
1802 /// This is helpful when "broadcasting" a set of values so that all the keys have the same
1803 /// values. For example, it can be used to send the same set of elements to several cluster
1804 /// members, if the membership information is available as a [`KeyedSingleton`].
1805 ///
1806 /// # Example
1807 /// ```rust
1808 /// # #[cfg(feature = "deploy")] {
1809 /// # use hydro_lang::prelude::*;
1810 /// # use futures::StreamExt;
1811 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1812 /// # let tick = process.tick();
1813 /// let keyed_singleton = // { 1: (), 2: () }
1814 /// # process
1815 /// # .source_iter(q!(vec![(1, ()), (2, ())]))
1816 /// # .into_keyed()
1817 /// # .batch(&tick, nondet!(/** test */))
1818 /// # .first();
1819 /// let stream = // [ "a", "b" ]
1820 /// # process
1821 /// # .source_iter(q!(vec!["a".to_string(), "b".to_string()]))
1822 /// # .batch(&tick, nondet!(/** test */));
1823 /// stream.repeat_with_keys(keyed_singleton)
1824 /// # .entries().all_ticks()
1825 /// # }, |mut stream| async move {
1826 /// // { 1: ["a", "b" ], 2: ["a", "b"] }
1827 /// # let mut results = Vec::new();
1828 /// # for _ in 0..4 {
1829 /// # results.push(stream.next().await.unwrap());
1830 /// # }
1831 /// # results.sort();
1832 /// # assert_eq!(results, vec![(1, "a".to_string()), (1, "b".to_string()), (2, "a".to_string()), (2, "b".to_string())]);
1833 /// # }));
1834 /// # }
1835 /// ```
1836 pub fn repeat_with_keys<K, V2>(
1837 self,
1838 keys: KeyedSingleton<K, V2, L, Bounded>,
1839 ) -> KeyedStream<K, T, L, Bounded, O, R>
1840 where
1841 K: Clone,
1842 T: Clone,
1843 {
1844 keys.keys()
1845 .weaken_retries()
1846 .assume_ordering_trusted::<TotalOrder>(
1847 nondet!(/** keyed stream does not depend on ordering of keys */),
1848 )
1849 .cross_product_nested_loop(self)
1850 .into_keyed()
1851 }
1852}
1853
1854impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
1855where
1856 L: Location<'a>,
1857{
1858 #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
1859 /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
1860 /// by equi-joining the two streams on the key attribute `K`.
1861 ///
1862 /// # Example
1863 /// ```rust
1864 /// # #[cfg(feature = "deploy")] {
1865 /// # use hydro_lang::prelude::*;
1866 /// # use std::collections::HashSet;
1867 /// # use futures::StreamExt;
1868 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1869 /// let tick = process.tick();
1870 /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
1871 /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
1872 /// stream1.join(stream2)
1873 /// # }, |mut stream| async move {
1874 /// // (1, ('a', 'x')), (2, ('b', 'y'))
1875 /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
1876 /// # stream.map(|i| assert!(expected.contains(&i)));
1877 /// # }));
1878 /// # }
1879 pub fn join<V2, O2: Ordering, R2: Retries>(
1880 self,
1881 n: Stream<(K, V2), L, B, O2, R2>,
1882 ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
1883 where
1884 K: Eq + Hash,
1885 R: MinRetries<R2>,
1886 {
1887 check_matching_location(&self.location, &n.location);
1888
1889 Stream::new(
1890 self.location.clone(),
1891 HydroNode::Join {
1892 left: Box::new(self.ir_node.into_inner()),
1893 right: Box::new(n.ir_node.into_inner()),
1894 metadata: self.location.new_node_metadata(Stream::<
1895 (K, (V1, V2)),
1896 L,
1897 B,
1898 NoOrder,
1899 <R as MinRetries<R2>>::Min,
1900 >::collection_kind()),
1901 },
1902 )
1903 }
1904
1905 /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
1906 /// computes the anti-join of the items in the input -- i.e. returns
1907 /// unique items in the first input that do not have a matching key
1908 /// in the second input.
1909 ///
1910 /// # Example
1911 /// ```rust
1912 /// # #[cfg(feature = "deploy")] {
1913 /// # use hydro_lang::prelude::*;
1914 /// # use futures::StreamExt;
1915 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1916 /// let tick = process.tick();
1917 /// let stream = process
1918 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1919 /// .batch(&tick, nondet!(/** test */));
1920 /// let batch = process
1921 /// .source_iter(q!(vec![1, 2]))
1922 /// .batch(&tick, nondet!(/** test */));
1923 /// stream.anti_join(batch).all_ticks()
1924 /// # }, |mut stream| async move {
1925 /// # for w in vec![(3, 'c'), (4, 'd')] {
1926 /// # assert_eq!(stream.next().await.unwrap(), w);
1927 /// # }
1928 /// # }));
1929 /// # }
1930 pub fn anti_join<O2: Ordering, R2: Retries>(
1931 self,
1932 n: Stream<K, L, Bounded, O2, R2>,
1933 ) -> Stream<(K, V1), L, B, O, R>
1934 where
1935 K: Eq + Hash,
1936 {
1937 check_matching_location(&self.location, &n.location);
1938
1939 Stream::new(
1940 self.location.clone(),
1941 HydroNode::AntiJoin {
1942 pos: Box::new(self.ir_node.into_inner()),
1943 neg: Box::new(n.ir_node.into_inner()),
1944 metadata: self
1945 .location
1946 .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
1947 },
1948 )
1949 }
1950}
1951
1952impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
1953 Stream<(K, V), L, B, O, R>
1954{
1955 /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
1956 /// is used as the key and the second element is added to the entries associated with that key.
1957 ///
1958 /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
1959 /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
1960 /// performing grouped aggregations, but also for more precise ordering guarantees such as
1961 /// total ordering _within_ each group but no ordering _across_ groups.
1962 ///
1963 /// # Example
1964 /// ```rust
1965 /// # #[cfg(feature = "deploy")] {
1966 /// # use hydro_lang::prelude::*;
1967 /// # use futures::StreamExt;
1968 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1969 /// process
1970 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1971 /// .into_keyed()
1972 /// # .entries()
1973 /// # }, |mut stream| async move {
1974 /// // { 1: [2, 3], 2: [4] }
1975 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
1976 /// # assert_eq!(stream.next().await.unwrap(), w);
1977 /// # }
1978 /// # }));
1979 /// # }
1980 /// ```
1981 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
1982 KeyedStream::new(
1983 self.location.clone(),
1984 HydroNode::Cast {
1985 inner: Box::new(self.ir_node.into_inner()),
1986 metadata: self
1987 .location
1988 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
1989 },
1990 )
1991 }
1992}
1993
1994impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
1995where
1996 K: Eq + Hash,
1997 L: Location<'a>,
1998{
1999 /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2000 /// # Example
2001 /// ```rust
2002 /// # #[cfg(feature = "deploy")] {
2003 /// # use hydro_lang::prelude::*;
2004 /// # use futures::StreamExt;
2005 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2006 /// let tick = process.tick();
2007 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2008 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2009 /// batch.keys().all_ticks()
2010 /// # }, |mut stream| async move {
2011 /// // 1, 2
2012 /// # assert_eq!(stream.next().await.unwrap(), 1);
2013 /// # assert_eq!(stream.next().await.unwrap(), 2);
2014 /// # }));
2015 /// # }
2016 /// ```
2017 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2018 self.into_keyed()
2019 .fold(
2020 q!(|| ()),
2021 q!(
2022 |_, _| {},
2023 commutative = ManualProof(/* values are ignored */),
2024 idempotent = ManualProof(/* values are ignored */)
2025 ),
2026 )
2027 .keys()
2028 }
2029}
2030
2031impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2032where
2033 L: Location<'a> + NoTick,
2034{
2035 /// Returns a stream corresponding to the latest batch of elements being atomically
2036 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2037 /// the order of the input.
2038 ///
2039 /// # Non-Determinism
2040 /// The batch boundaries are non-deterministic and may change across executions.
2041 pub fn batch_atomic(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2042 Stream::new(
2043 self.location.clone().tick,
2044 HydroNode::Batch {
2045 inner: Box::new(self.ir_node.into_inner()),
2046 metadata: self
2047 .location
2048 .tick
2049 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2050 },
2051 )
2052 }
2053
2054 /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2055 /// See [`Stream::atomic`] for more details.
2056 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2057 Stream::new(
2058 self.location.tick.l.clone(),
2059 HydroNode::EndAtomic {
2060 inner: Box::new(self.ir_node.into_inner()),
2061 metadata: self
2062 .location
2063 .tick
2064 .l
2065 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2066 },
2067 )
2068 }
2069}
2070
2071impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2072where
2073 L: Location<'a>,
2074{
2075 /// Shifts this stream into an atomic context, which guarantees that any downstream logic
2076 /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
2077 ///
2078 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2079 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2080 /// argument that declares where the stream will be atomically processed. Batching a stream into
2081 /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2082 /// [`Tick`] will introduce asynchrony.
2083 pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
2084 let out_location = Atomic { tick: tick.clone() };
2085 Stream::new(
2086 out_location.clone(),
2087 HydroNode::BeginAtomic {
2088 inner: Box::new(self.ir_node.into_inner()),
2089 metadata: out_location
2090 .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
2091 },
2092 )
2093 }
2094
2095 /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2096 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2097 /// the order of the input. The output stream will execute in the [`Tick`] that was
2098 /// used to create the atomic section.
2099 ///
2100 /// # Non-Determinism
2101 /// The batch boundaries are non-deterministic and may change across executions.
2102 pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2103 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2104 Stream::new(
2105 tick.clone(),
2106 HydroNode::Batch {
2107 inner: Box::new(self.ir_node.into_inner()),
2108 metadata: tick
2109 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2110 },
2111 )
2112 }
2113
2114 /// Given a time interval, returns a stream corresponding to samples taken from the
2115 /// stream roughly at that interval. The output will have elements in the same order
2116 /// as the input, but with arbitrary elements skipped between samples. There is also
2117 /// no guarantee on the exact timing of the samples.
2118 ///
2119 /// # Non-Determinism
2120 /// The output stream is non-deterministic in which elements are sampled, since this
2121 /// is controlled by a clock.
2122 pub fn sample_every(
2123 self,
2124 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
2125 nondet: NonDet,
2126 ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
2127 where
2128 L: NoTick + NoAtomic,
2129 {
2130 let samples = self.location.source_interval(interval, nondet);
2131
2132 let tick = self.location.tick();
2133 self.batch(&tick, nondet)
2134 .filter_if_some(samples.batch(&tick, nondet).first())
2135 .all_ticks()
2136 .weakest_retries()
2137 }
2138
2139 /// Given a timeout duration, returns an [`Optional`] which will have a value if the
2140 /// stream has not emitted a value since that duration.
2141 ///
2142 /// # Non-Determinism
2143 /// Timeout relies on non-deterministic sampling of the stream, so depending on when
2144 /// samples take place, timeouts may be non-deterministically generated or missed,
2145 /// and the notification of the timeout may be delayed as well. There is also no
2146 /// guarantee on how long the [`Optional`] will have a value after the timeout is
2147 /// detected based on when the next sample is taken.
2148 pub fn timeout(
2149 self,
2150 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
2151 nondet: NonDet,
2152 ) -> Optional<(), L, Unbounded>
2153 where
2154 L: NoTick + NoAtomic,
2155 {
2156 let tick = self.location.tick();
2157
2158 let latest_received = self.assume_retries(nondet).fold(
2159 q!(|| None),
2160 q!(
2161 |latest, _| {
2162 *latest = Some(Instant::now());
2163 },
2164 commutative = ManualProof(/* TODO */)
2165 ),
2166 );
2167
2168 latest_received
2169 .snapshot(&tick, nondet)
2170 .filter_map(q!(move |latest_received| {
2171 if let Some(latest_received) = latest_received {
2172 if Instant::now().duration_since(latest_received) > duration {
2173 Some(())
2174 } else {
2175 None
2176 }
2177 } else {
2178 Some(())
2179 }
2180 }))
2181 .latest()
2182 }
2183}
2184
2185impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2186where
2187 L: Location<'a> + NoTick + NoAtomic,
2188 F: Future<Output = T>,
2189{
2190 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2191 /// Future outputs are produced as available, regardless of input arrival order.
2192 ///
2193 /// # Example
2194 /// ```rust
2195 /// # #[cfg(feature = "deploy")] {
2196 /// # use std::collections::HashSet;
2197 /// # use futures::StreamExt;
2198 /// # use hydro_lang::prelude::*;
2199 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2200 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2201 /// .map(q!(|x| async move {
2202 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2203 /// x
2204 /// }))
2205 /// .resolve_futures()
2206 /// # },
2207 /// # |mut stream| async move {
2208 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2209 /// # let mut output = HashSet::new();
2210 /// # for _ in 1..10 {
2211 /// # output.insert(stream.next().await.unwrap());
2212 /// # }
2213 /// # assert_eq!(
2214 /// # output,
2215 /// # HashSet::<i32>::from_iter(1..10)
2216 /// # );
2217 /// # },
2218 /// # ));
2219 /// # }
2220 pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
2221 Stream::new(
2222 self.location.clone(),
2223 HydroNode::ResolveFutures {
2224 input: Box::new(self.ir_node.into_inner()),
2225 metadata: self
2226 .location
2227 .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
2228 },
2229 )
2230 }
2231
2232 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2233 /// Future outputs are produced in the same order as the input stream.
2234 ///
2235 /// # Example
2236 /// ```rust
2237 /// # #[cfg(feature = "deploy")] {
2238 /// # use std::collections::HashSet;
2239 /// # use futures::StreamExt;
2240 /// # use hydro_lang::prelude::*;
2241 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2242 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2243 /// .map(q!(|x| async move {
2244 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2245 /// x
2246 /// }))
2247 /// .resolve_futures_ordered()
2248 /// # },
2249 /// # |mut stream| async move {
2250 /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2251 /// # let mut output = Vec::new();
2252 /// # for _ in 1..10 {
2253 /// # output.push(stream.next().await.unwrap());
2254 /// # }
2255 /// # assert_eq!(
2256 /// # output,
2257 /// # vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2258 /// # );
2259 /// # },
2260 /// # ));
2261 /// # }
2262 pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
2263 Stream::new(
2264 self.location.clone(),
2265 HydroNode::ResolveFuturesOrdered {
2266 input: Box::new(self.ir_node.into_inner()),
2267 metadata: self
2268 .location
2269 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2270 },
2271 )
2272 }
2273}
2274
2275impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
2276where
2277 L: Location<'a> + NoTick,
2278{
2279 /// Executes the provided closure for every element in this stream.
2280 ///
2281 /// Because the closure may have side effects, the stream must have deterministic order
2282 /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
2283 /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
2284 /// [`Stream::assume_retries`] with an explanation for why this is the case.
2285 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
2286 let f = f.splice_fn1_ctx(&self.location).into();
2287 self.location
2288 .flow_state()
2289 .borrow_mut()
2290 .push_root(HydroRoot::ForEach {
2291 input: Box::new(self.ir_node.into_inner()),
2292 f,
2293 op_metadata: HydroIrOpMetadata::new(),
2294 });
2295 }
2296
2297 /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
2298 /// TCP socket to some other server. You should _not_ use this API for interacting with
2299 /// external clients, instead see [`Location::bidi_external_many_bytes`] and
2300 /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
2301 /// interaction with asynchronous sinks.
2302 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
2303 where
2304 S: 'a + futures::Sink<T> + Unpin,
2305 {
2306 self.location
2307 .flow_state()
2308 .borrow_mut()
2309 .push_root(HydroRoot::DestSink {
2310 sink: sink.splice_typed_ctx(&self.location).into(),
2311 input: Box::new(self.ir_node.into_inner()),
2312 op_metadata: HydroIrOpMetadata::new(),
2313 });
2314 }
2315}
2316
2317impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2318where
2319 L: Location<'a>,
2320{
2321 /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2322 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2323 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2324 Stream::new(
2325 self.location.outer().clone(),
2326 HydroNode::YieldConcat {
2327 inner: Box::new(self.ir_node.into_inner()),
2328 metadata: self
2329 .location
2330 .outer()
2331 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2332 },
2333 )
2334 }
2335
2336 /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2337 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2338 ///
2339 /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2340 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2341 /// stream's [`Tick`] context.
2342 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2343 let out_location = Atomic {
2344 tick: self.location.clone(),
2345 };
2346
2347 Stream::new(
2348 out_location.clone(),
2349 HydroNode::YieldConcat {
2350 inner: Box::new(self.ir_node.into_inner()),
2351 metadata: out_location
2352 .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2353 },
2354 )
2355 }
2356
2357 /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
2358 /// such as `fold` retrain their memory across ticks rather than resetting across batches of
2359 /// input.
2360 ///
2361 /// This API is particularly useful for stateful computation on batches of data, such as
2362 /// maintaining an accumulated state that is up to date with the current batch.
2363 ///
2364 /// # Example
2365 /// ```rust
2366 /// # #[cfg(feature = "deploy")] {
2367 /// # use hydro_lang::prelude::*;
2368 /// # use futures::StreamExt;
2369 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2370 /// let tick = process.tick();
2371 /// # // ticks are lazy by default, forces the second tick to run
2372 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2373 /// # let batch_first_tick = process
2374 /// # .source_iter(q!(vec![1, 2, 3, 4]))
2375 /// # .batch(&tick, nondet!(/** test */));
2376 /// # let batch_second_tick = process
2377 /// # .source_iter(q!(vec![5, 6, 7]))
2378 /// # .batch(&tick, nondet!(/** test */))
2379 /// # .defer_tick(); // appears on the second tick
2380 /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
2381 /// # batch_first_tick.chain(batch_second_tick).all_ticks();
2382 ///
2383 /// input.batch(&tick, nondet!(/** test */))
2384 /// .across_ticks(|s| s.count()).all_ticks()
2385 /// # }, |mut stream| async move {
2386 /// // [4, 7]
2387 /// assert_eq!(stream.next().await.unwrap(), 4);
2388 /// assert_eq!(stream.next().await.unwrap(), 7);
2389 /// # }));
2390 /// # }
2391 /// ```
2392 pub fn across_ticks<Out: BatchAtomic>(
2393 self,
2394 thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
2395 ) -> Out::Batched {
2396 thunk(self.all_ticks_atomic()).batched_atomic()
2397 }
2398
2399 /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2400 /// always has the elements of `self` at tick `T - 1`.
2401 ///
2402 /// At tick `0`, the output stream is empty, since there is no previous tick.
2403 ///
2404 /// This operator enables stateful iterative processing with ticks, by sending data from one
2405 /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2406 ///
2407 /// # Example
2408 /// ```rust
2409 /// # #[cfg(feature = "deploy")] {
2410 /// # use hydro_lang::prelude::*;
2411 /// # use futures::StreamExt;
2412 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2413 /// let tick = process.tick();
2414 /// // ticks are lazy by default, forces the second tick to run
2415 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2416 ///
2417 /// let batch_first_tick = process
2418 /// .source_iter(q!(vec![1, 2, 3, 4]))
2419 /// .batch(&tick, nondet!(/** test */));
2420 /// let batch_second_tick = process
2421 /// .source_iter(q!(vec![0, 3, 4, 5, 6]))
2422 /// .batch(&tick, nondet!(/** test */))
2423 /// .defer_tick(); // appears on the second tick
2424 /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2425 ///
2426 /// changes_across_ticks.clone().filter_not_in(
2427 /// changes_across_ticks.defer_tick() // the elements from the previous tick
2428 /// ).all_ticks()
2429 /// # }, |mut stream| async move {
2430 /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2431 /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2432 /// # assert_eq!(stream.next().await.unwrap(), w);
2433 /// # }
2434 /// # }));
2435 /// # }
2436 /// ```
2437 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2438 Stream::new(
2439 self.location.clone(),
2440 HydroNode::DeferTick {
2441 input: Box::new(self.ir_node.into_inner()),
2442 metadata: self
2443 .location
2444 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2445 },
2446 )
2447 }
2448}
2449
2450#[cfg(test)]
2451mod tests {
2452 #[cfg(feature = "deploy")]
2453 use futures::{SinkExt, StreamExt};
2454 #[cfg(feature = "deploy")]
2455 use hydro_deploy::Deployment;
2456 #[cfg(feature = "deploy")]
2457 use serde::{Deserialize, Serialize};
2458 #[cfg(feature = "deploy")]
2459 use stageleft::q;
2460
2461 #[cfg(any(feature = "deploy", feature = "sim"))]
2462 use crate::compile::builder::FlowBuilder;
2463 #[cfg(feature = "deploy")]
2464 use crate::live_collections::sliced::sliced;
2465 #[cfg(feature = "deploy")]
2466 use crate::live_collections::stream::ExactlyOnce;
2467 #[cfg(feature = "sim")]
2468 use crate::live_collections::stream::NoOrder;
2469 #[cfg(any(feature = "deploy", feature = "sim"))]
2470 use crate::live_collections::stream::TotalOrder;
2471 #[cfg(any(feature = "deploy", feature = "sim"))]
2472 use crate::location::Location;
2473 #[cfg(any(feature = "deploy", feature = "sim"))]
2474 use crate::nondet::nondet;
2475
2476 mod backtrace_chained_ops;
2477
2478 #[cfg(feature = "deploy")]
2479 struct P1 {}
2480 #[cfg(feature = "deploy")]
2481 struct P2 {}
2482
2483 #[cfg(feature = "deploy")]
2484 #[derive(Serialize, Deserialize, Debug)]
2485 struct SendOverNetwork {
2486 n: u32,
2487 }
2488
2489 #[cfg(feature = "deploy")]
2490 #[tokio::test]
2491 async fn first_ten_distributed() {
2492 use crate::networking::TCP;
2493
2494 let mut deployment = Deployment::new();
2495
2496 let flow = FlowBuilder::new();
2497 let first_node = flow.process::<P1>();
2498 let second_node = flow.process::<P2>();
2499 let external = flow.external::<P2>();
2500
2501 let numbers = first_node.source_iter(q!(0..10));
2502 let out_port = numbers
2503 .map(q!(|n| SendOverNetwork { n }))
2504 .send(&second_node, TCP.bincode())
2505 .send_bincode_external(&external);
2506
2507 let nodes = flow
2508 .with_process(&first_node, deployment.Localhost())
2509 .with_process(&second_node, deployment.Localhost())
2510 .with_external(&external, deployment.Localhost())
2511 .deploy(&mut deployment);
2512
2513 deployment.deploy().await.unwrap();
2514
2515 let mut external_out = nodes.connect(out_port).await;
2516
2517 deployment.start().await.unwrap();
2518
2519 for i in 0..10 {
2520 assert_eq!(external_out.next().await.unwrap().n, i);
2521 }
2522 }
2523
2524 #[cfg(feature = "deploy")]
2525 #[tokio::test]
2526 async fn first_cardinality() {
2527 let mut deployment = Deployment::new();
2528
2529 let flow = FlowBuilder::new();
2530 let node = flow.process::<()>();
2531 let external = flow.external::<()>();
2532
2533 let node_tick = node.tick();
2534 let count = node_tick
2535 .singleton(q!([1, 2, 3]))
2536 .into_stream()
2537 .flatten_ordered()
2538 .first()
2539 .into_stream()
2540 .count()
2541 .all_ticks()
2542 .send_bincode_external(&external);
2543
2544 let nodes = flow
2545 .with_process(&node, deployment.Localhost())
2546 .with_external(&external, deployment.Localhost())
2547 .deploy(&mut deployment);
2548
2549 deployment.deploy().await.unwrap();
2550
2551 let mut external_out = nodes.connect(count).await;
2552
2553 deployment.start().await.unwrap();
2554
2555 assert_eq!(external_out.next().await.unwrap(), 1);
2556 }
2557
2558 #[cfg(feature = "deploy")]
2559 #[tokio::test]
2560 async fn unbounded_reduce_remembers_state() {
2561 let mut deployment = Deployment::new();
2562
2563 let flow = FlowBuilder::new();
2564 let node = flow.process::<()>();
2565 let external = flow.external::<()>();
2566
2567 let (input_port, input) = node.source_external_bincode(&external);
2568 let out = input
2569 .reduce(q!(|acc, v| *acc += v))
2570 .sample_eager(nondet!(/** test */))
2571 .send_bincode_external(&external);
2572
2573 let nodes = flow
2574 .with_process(&node, deployment.Localhost())
2575 .with_external(&external, deployment.Localhost())
2576 .deploy(&mut deployment);
2577
2578 deployment.deploy().await.unwrap();
2579
2580 let mut external_in = nodes.connect(input_port).await;
2581 let mut external_out = nodes.connect(out).await;
2582
2583 deployment.start().await.unwrap();
2584
2585 external_in.send(1).await.unwrap();
2586 assert_eq!(external_out.next().await.unwrap(), 1);
2587
2588 external_in.send(2).await.unwrap();
2589 assert_eq!(external_out.next().await.unwrap(), 3);
2590 }
2591
2592 #[cfg(feature = "deploy")]
2593 #[tokio::test]
2594 async fn top_level_bounded_cross_singleton() {
2595 let mut deployment = Deployment::new();
2596
2597 let flow = FlowBuilder::new();
2598 let node = flow.process::<()>();
2599 let external = flow.external::<()>();
2600
2601 let (input_port, input) =
2602 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2603
2604 let out = input
2605 .cross_singleton(
2606 node.source_iter(q!(vec![1, 2, 3]))
2607 .fold(q!(|| 0), q!(|acc, v| *acc += v)),
2608 )
2609 .send_bincode_external(&external);
2610
2611 let nodes = flow
2612 .with_process(&node, deployment.Localhost())
2613 .with_external(&external, deployment.Localhost())
2614 .deploy(&mut deployment);
2615
2616 deployment.deploy().await.unwrap();
2617
2618 let mut external_in = nodes.connect(input_port).await;
2619 let mut external_out = nodes.connect(out).await;
2620
2621 deployment.start().await.unwrap();
2622
2623 external_in.send(1).await.unwrap();
2624 assert_eq!(external_out.next().await.unwrap(), (1, 6));
2625
2626 external_in.send(2).await.unwrap();
2627 assert_eq!(external_out.next().await.unwrap(), (2, 6));
2628 }
2629
2630 #[cfg(feature = "deploy")]
2631 #[tokio::test]
2632 async fn top_level_bounded_reduce_cardinality() {
2633 let mut deployment = Deployment::new();
2634
2635 let flow = FlowBuilder::new();
2636 let node = flow.process::<()>();
2637 let external = flow.external::<()>();
2638
2639 let (input_port, input) =
2640 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2641
2642 let out = sliced! {
2643 let input = use(input, nondet!(/** test */));
2644 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
2645 input.cross_singleton(v.into_stream().count())
2646 }
2647 .send_bincode_external(&external);
2648
2649 let nodes = flow
2650 .with_process(&node, deployment.Localhost())
2651 .with_external(&external, deployment.Localhost())
2652 .deploy(&mut deployment);
2653
2654 deployment.deploy().await.unwrap();
2655
2656 let mut external_in = nodes.connect(input_port).await;
2657 let mut external_out = nodes.connect(out).await;
2658
2659 deployment.start().await.unwrap();
2660
2661 external_in.send(1).await.unwrap();
2662 assert_eq!(external_out.next().await.unwrap(), (1, 1));
2663
2664 external_in.send(2).await.unwrap();
2665 assert_eq!(external_out.next().await.unwrap(), (2, 1));
2666 }
2667
2668 #[cfg(feature = "deploy")]
2669 #[tokio::test]
2670 async fn top_level_bounded_into_singleton_cardinality() {
2671 let mut deployment = Deployment::new();
2672
2673 let flow = FlowBuilder::new();
2674 let node = flow.process::<()>();
2675 let external = flow.external::<()>();
2676
2677 let (input_port, input) =
2678 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2679
2680 let out = sliced! {
2681 let input = use(input, nondet!(/** test */));
2682 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
2683 input.cross_singleton(v.into_stream().count())
2684 }
2685 .send_bincode_external(&external);
2686
2687 let nodes = flow
2688 .with_process(&node, deployment.Localhost())
2689 .with_external(&external, deployment.Localhost())
2690 .deploy(&mut deployment);
2691
2692 deployment.deploy().await.unwrap();
2693
2694 let mut external_in = nodes.connect(input_port).await;
2695 let mut external_out = nodes.connect(out).await;
2696
2697 deployment.start().await.unwrap();
2698
2699 external_in.send(1).await.unwrap();
2700 assert_eq!(external_out.next().await.unwrap(), (1, 1));
2701
2702 external_in.send(2).await.unwrap();
2703 assert_eq!(external_out.next().await.unwrap(), (2, 1));
2704 }
2705
2706 #[cfg(feature = "deploy")]
2707 #[tokio::test]
2708 async fn atomic_fold_replays_each_tick() {
2709 let mut deployment = Deployment::new();
2710
2711 let flow = FlowBuilder::new();
2712 let node = flow.process::<()>();
2713 let external = flow.external::<()>();
2714
2715 let (input_port, input) =
2716 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2717 let tick = node.tick();
2718
2719 let out = input
2720 .batch(&tick, nondet!(/** test */))
2721 .cross_singleton(
2722 node.source_iter(q!(vec![1, 2, 3]))
2723 .atomic(&tick)
2724 .fold(q!(|| 0), q!(|acc, v| *acc += v))
2725 .snapshot_atomic(nondet!(/** test */)),
2726 )
2727 .all_ticks()
2728 .send_bincode_external(&external);
2729
2730 let nodes = flow
2731 .with_process(&node, deployment.Localhost())
2732 .with_external(&external, deployment.Localhost())
2733 .deploy(&mut deployment);
2734
2735 deployment.deploy().await.unwrap();
2736
2737 let mut external_in = nodes.connect(input_port).await;
2738 let mut external_out = nodes.connect(out).await;
2739
2740 deployment.start().await.unwrap();
2741
2742 external_in.send(1).await.unwrap();
2743 assert_eq!(external_out.next().await.unwrap(), (1, 6));
2744
2745 external_in.send(2).await.unwrap();
2746 assert_eq!(external_out.next().await.unwrap(), (2, 6));
2747 }
2748
2749 #[cfg(feature = "deploy")]
2750 #[tokio::test]
2751 async fn unbounded_scan_remembers_state() {
2752 let mut deployment = Deployment::new();
2753
2754 let flow = FlowBuilder::new();
2755 let node = flow.process::<()>();
2756 let external = flow.external::<()>();
2757
2758 let (input_port, input) = node.source_external_bincode(&external);
2759 let out = input
2760 .scan(
2761 q!(|| 0),
2762 q!(|acc, v| {
2763 *acc += v;
2764 Some(*acc)
2765 }),
2766 )
2767 .send_bincode_external(&external);
2768
2769 let nodes = flow
2770 .with_process(&node, deployment.Localhost())
2771 .with_external(&external, deployment.Localhost())
2772 .deploy(&mut deployment);
2773
2774 deployment.deploy().await.unwrap();
2775
2776 let mut external_in = nodes.connect(input_port).await;
2777 let mut external_out = nodes.connect(out).await;
2778
2779 deployment.start().await.unwrap();
2780
2781 external_in.send(1).await.unwrap();
2782 assert_eq!(external_out.next().await.unwrap(), 1);
2783
2784 external_in.send(2).await.unwrap();
2785 assert_eq!(external_out.next().await.unwrap(), 3);
2786 }
2787
2788 #[cfg(feature = "deploy")]
2789 #[tokio::test]
2790 async fn unbounded_enumerate_remembers_state() {
2791 let mut deployment = Deployment::new();
2792
2793 let flow = FlowBuilder::new();
2794 let node = flow.process::<()>();
2795 let external = flow.external::<()>();
2796
2797 let (input_port, input) = node.source_external_bincode(&external);
2798 let out = input.enumerate().send_bincode_external(&external);
2799
2800 let nodes = flow
2801 .with_process(&node, deployment.Localhost())
2802 .with_external(&external, deployment.Localhost())
2803 .deploy(&mut deployment);
2804
2805 deployment.deploy().await.unwrap();
2806
2807 let mut external_in = nodes.connect(input_port).await;
2808 let mut external_out = nodes.connect(out).await;
2809
2810 deployment.start().await.unwrap();
2811
2812 external_in.send(1).await.unwrap();
2813 assert_eq!(external_out.next().await.unwrap(), (0, 1));
2814
2815 external_in.send(2).await.unwrap();
2816 assert_eq!(external_out.next().await.unwrap(), (1, 2));
2817 }
2818
2819 #[cfg(feature = "deploy")]
2820 #[tokio::test]
2821 async fn unbounded_unique_remembers_state() {
2822 let mut deployment = Deployment::new();
2823
2824 let flow = FlowBuilder::new();
2825 let node = flow.process::<()>();
2826 let external = flow.external::<()>();
2827
2828 let (input_port, input) =
2829 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2830 let out = input.unique().send_bincode_external(&external);
2831
2832 let nodes = flow
2833 .with_process(&node, deployment.Localhost())
2834 .with_external(&external, deployment.Localhost())
2835 .deploy(&mut deployment);
2836
2837 deployment.deploy().await.unwrap();
2838
2839 let mut external_in = nodes.connect(input_port).await;
2840 let mut external_out = nodes.connect(out).await;
2841
2842 deployment.start().await.unwrap();
2843
2844 external_in.send(1).await.unwrap();
2845 assert_eq!(external_out.next().await.unwrap(), 1);
2846
2847 external_in.send(2).await.unwrap();
2848 assert_eq!(external_out.next().await.unwrap(), 2);
2849
2850 external_in.send(1).await.unwrap();
2851 external_in.send(3).await.unwrap();
2852 assert_eq!(external_out.next().await.unwrap(), 3);
2853 }
2854
2855 #[cfg(feature = "sim")]
2856 #[test]
2857 #[should_panic]
2858 fn sim_batch_nondet_size() {
2859 let flow = FlowBuilder::new();
2860 let node = flow.process::<()>();
2861
2862 let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
2863
2864 let tick = node.tick();
2865 let out_recv = input
2866 .batch(&tick, nondet!(/** test */))
2867 .count()
2868 .all_ticks()
2869 .sim_output();
2870
2871 flow.sim().exhaustive(async || {
2872 in_send.send(());
2873 in_send.send(());
2874 in_send.send(());
2875
2876 assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
2877 });
2878 }
2879
2880 #[cfg(feature = "sim")]
2881 #[test]
2882 fn sim_batch_preserves_order() {
2883 let flow = FlowBuilder::new();
2884 let node = flow.process::<()>();
2885
2886 let (in_send, input) = node.sim_input();
2887
2888 let tick = node.tick();
2889 let out_recv = input
2890 .batch(&tick, nondet!(/** test */))
2891 .all_ticks()
2892 .sim_output();
2893
2894 flow.sim().exhaustive(async || {
2895 in_send.send(1);
2896 in_send.send(2);
2897 in_send.send(3);
2898
2899 out_recv.assert_yields_only([1, 2, 3]).await;
2900 });
2901 }
2902
2903 #[cfg(feature = "sim")]
2904 #[test]
2905 #[should_panic]
2906 fn sim_batch_unordered_shuffles() {
2907 let flow = FlowBuilder::new();
2908 let node = flow.process::<()>();
2909
2910 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2911
2912 let tick = node.tick();
2913 let batch = input.batch(&tick, nondet!(/** test */));
2914 let out_recv = batch
2915 .clone()
2916 .min()
2917 .zip(batch.max())
2918 .all_ticks()
2919 .sim_output();
2920
2921 flow.sim().exhaustive(async || {
2922 in_send.send_many_unordered([1, 2, 3]);
2923
2924 if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
2925 panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
2926 }
2927 });
2928 }
2929
2930 #[cfg(feature = "sim")]
2931 #[test]
2932 fn sim_batch_unordered_shuffles_count() {
2933 let flow = FlowBuilder::new();
2934 let node = flow.process::<()>();
2935
2936 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2937
2938 let tick = node.tick();
2939 let batch = input.batch(&tick, nondet!(/** test */));
2940 let out_recv = batch.all_ticks().sim_output();
2941
2942 let instance_count = flow.sim().exhaustive(async || {
2943 in_send.send_many_unordered([1, 2, 3, 4]);
2944 out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
2945 });
2946
2947 assert_eq!(
2948 instance_count,
2949 75 // ∑ (k=1 to 4) S(4,k) × k! = 75
2950 )
2951 }
2952
2953 #[cfg(feature = "sim")]
2954 #[test]
2955 #[should_panic]
2956 fn sim_observe_order_batched() {
2957 let flow = FlowBuilder::new();
2958 let node = flow.process::<()>();
2959
2960 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2961
2962 let tick = node.tick();
2963 let batch = input.batch(&tick, nondet!(/** test */));
2964 let out_recv = batch
2965 .assume_ordering::<TotalOrder>(nondet!(/** test */))
2966 .all_ticks()
2967 .sim_output();
2968
2969 flow.sim().exhaustive(async || {
2970 in_send.send_many_unordered([1, 2, 3, 4]);
2971 out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
2972 });
2973 }
2974
2975 #[cfg(feature = "sim")]
2976 #[test]
2977 fn sim_observe_order_batched_count() {
2978 let flow = FlowBuilder::new();
2979 let node = flow.process::<()>();
2980
2981 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2982
2983 let tick = node.tick();
2984 let batch = input.batch(&tick, nondet!(/** test */));
2985 let out_recv = batch
2986 .assume_ordering::<TotalOrder>(nondet!(/** test */))
2987 .all_ticks()
2988 .sim_output();
2989
2990 let instance_count = flow.sim().exhaustive(async || {
2991 in_send.send_many_unordered([1, 2, 3, 4]);
2992 let _ = out_recv.collect::<Vec<_>>().await;
2993 });
2994
2995 assert_eq!(
2996 instance_count,
2997 192 // 4! * 2^{4 - 1}
2998 )
2999 }
3000
3001 #[cfg(feature = "sim")]
3002 #[test]
3003 fn sim_unordered_count_instance_count() {
3004 let flow = FlowBuilder::new();
3005 let node = flow.process::<()>();
3006
3007 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3008
3009 let tick = node.tick();
3010 let out_recv = input
3011 .count()
3012 .snapshot(&tick, nondet!(/** test */))
3013 .all_ticks()
3014 .sim_output();
3015
3016 let instance_count = flow.sim().exhaustive(async || {
3017 in_send.send_many_unordered([1, 2, 3, 4]);
3018 assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3019 });
3020
3021 assert_eq!(
3022 instance_count,
3023 16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3024 )
3025 }
3026}