hydro_lang/live_collections/stream/mod.rs
1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::KeyedStream;
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::ir::{
19 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
20};
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::forward_handle::{ForwardRef, TickCycle};
24#[cfg(stageleft_runtime)]
25use crate::location::dynamic::{DynLocation, LocationId};
26use crate::location::tick::{Atomic, DeferTick, NoAtomic};
27use crate::location::{Location, NoTick, Tick, check_matching_location};
28use crate::nondet::{NonDet, nondet};
29
30pub mod networking;
31
32/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
33#[sealed::sealed]
34pub trait Ordering:
35 MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
36{
37 /// The [`StreamOrder`] corresponding to this type.
38 const ORDERING_KIND: StreamOrder;
39}
40
41/// Marks the stream as being totally ordered, which means that there are
42/// no sources of non-determinism (other than intentional ones) that will
43/// affect the order of elements.
44pub enum TotalOrder {}
45
46#[sealed::sealed]
47impl Ordering for TotalOrder {
48 const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
49}
50
51/// Marks the stream as having no order, which means that the order of
52/// elements may be affected by non-determinism.
53///
54/// This restricts certain operators, such as `fold` and `reduce`, to only
55/// be used with commutative aggregation functions.
56pub enum NoOrder {}
57
58#[sealed::sealed]
59impl Ordering for NoOrder {
60 const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
61}
62
63/// Helper trait for determining the weakest of two orderings.
64#[sealed::sealed]
65pub trait MinOrder<Other: ?Sized> {
66 /// The weaker of the two orderings.
67 type Min: Ordering;
68}
69
70#[sealed::sealed]
71impl MinOrder<NoOrder> for TotalOrder {
72 type Min = NoOrder;
73}
74
75#[sealed::sealed]
76impl MinOrder<TotalOrder> for TotalOrder {
77 type Min = TotalOrder;
78}
79
80#[sealed::sealed]
81impl MinOrder<TotalOrder> for NoOrder {
82 type Min = NoOrder;
83}
84
85#[sealed::sealed]
86impl MinOrder<NoOrder> for NoOrder {
87 type Min = NoOrder;
88}
89
90/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
91#[sealed::sealed]
92pub trait Retries:
93 MinRetries<Self, Min = Self>
94 + MinRetries<ExactlyOnce, Min = Self>
95 + MinRetries<AtLeastOnce, Min = AtLeastOnce>
96{
97 /// The [`StreamRetry`] corresponding to this type.
98 const RETRIES_KIND: StreamRetry;
99}
100
101/// Marks the stream as having deterministic message cardinality, with no
102/// possibility of duplicates.
103pub enum ExactlyOnce {}
104
105#[sealed::sealed]
106impl Retries for ExactlyOnce {
107 const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
108}
109
110/// Marks the stream as having non-deterministic message cardinality, which
111/// means that duplicates may occur, but messages will not be dropped.
112pub enum AtLeastOnce {}
113
114#[sealed::sealed]
115impl Retries for AtLeastOnce {
116 const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
117}
118
119/// Helper trait for determining the weakest of two retry guarantees.
120#[sealed::sealed]
121pub trait MinRetries<Other: ?Sized> {
122 /// The weaker of the two retry guarantees.
123 type Min: Retries;
124}
125
126#[sealed::sealed]
127impl MinRetries<AtLeastOnce> for ExactlyOnce {
128 type Min = AtLeastOnce;
129}
130
131#[sealed::sealed]
132impl MinRetries<ExactlyOnce> for ExactlyOnce {
133 type Min = ExactlyOnce;
134}
135
136#[sealed::sealed]
137impl MinRetries<ExactlyOnce> for AtLeastOnce {
138 type Min = AtLeastOnce;
139}
140
141#[sealed::sealed]
142impl MinRetries<AtLeastOnce> for AtLeastOnce {
143 type Min = AtLeastOnce;
144}
145
146/// Streaming sequence of elements with type `Type`.
147///
148/// This live collection represents a growing sequence of elements, with new elements being
149/// asynchronously appended to the end of the sequence. This can be used to model the arrival
150/// of network input, such as API requests, or streaming ingestion.
151///
152/// By default, all streams have deterministic ordering and each element is materialized exactly
153/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
154/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
155/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
156///
157/// Type Parameters:
158/// - `Type`: the type of elements in the stream
159/// - `Loc`: the location where the stream is being materialized
160/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
161/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
162/// (default is [`TotalOrder`])
163/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
164/// [`AtLeastOnce`] (default is [`ExactlyOnce`])
165pub struct Stream<
166 Type,
167 Loc,
168 Bound: Boundedness,
169 Order: Ordering = TotalOrder,
170 Retry: Retries = ExactlyOnce,
171> {
172 pub(crate) location: Loc,
173 pub(crate) ir_node: RefCell<HydroNode>,
174
175 _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
176}
177
178impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
179 for Stream<T, L, Unbounded, O, R>
180where
181 L: Location<'a>,
182{
183 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
184 Stream {
185 location: stream.location,
186 ir_node: stream.ir_node,
187 _phantom: PhantomData,
188 }
189 }
190}
191
192impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
193 for Stream<T, L, B, NoOrder, R>
194where
195 L: Location<'a>,
196{
197 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
198 Stream {
199 location: stream.location,
200 ir_node: stream.ir_node,
201 _phantom: PhantomData,
202 }
203 }
204}
205
206impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
207 for Stream<T, L, B, O, AtLeastOnce>
208where
209 L: Location<'a>,
210{
211 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
212 Stream {
213 location: stream.location,
214 ir_node: stream.ir_node,
215 _phantom: PhantomData,
216 }
217 }
218}
219
220impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
221where
222 L: Location<'a>,
223{
224 fn defer_tick(self) -> Self {
225 Stream::defer_tick(self)
226 }
227}
228
229impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
230 for Stream<T, Tick<L>, Bounded, O, R>
231where
232 L: Location<'a>,
233{
234 type Location = Tick<L>;
235
236 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
237 Stream::new(
238 location.clone(),
239 HydroNode::CycleSource {
240 ident,
241 metadata: location.new_node_metadata(Self::collection_kind()),
242 },
243 )
244 }
245}
246
247impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
248 for Stream<T, Tick<L>, Bounded, O, R>
249where
250 L: Location<'a>,
251{
252 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
253 assert_eq!(
254 Location::id(&self.location),
255 expected_location,
256 "locations do not match"
257 );
258 self.location
259 .flow_state()
260 .borrow_mut()
261 .push_root(HydroRoot::CycleSink {
262 ident,
263 input: Box::new(self.ir_node.into_inner()),
264 op_metadata: HydroIrOpMetadata::new(),
265 });
266 }
267}
268
269impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
270 for Stream<T, L, B, O, R>
271where
272 L: Location<'a> + NoTick,
273{
274 type Location = L;
275
276 fn create_source(ident: syn::Ident, location: L) -> Self {
277 Stream::new(
278 location.clone(),
279 HydroNode::CycleSource {
280 ident,
281 metadata: location.new_node_metadata(Self::collection_kind()),
282 },
283 )
284 }
285}
286
287impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
288 for Stream<T, L, B, O, R>
289where
290 L: Location<'a> + NoTick,
291{
292 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
293 assert_eq!(
294 Location::id(&self.location),
295 expected_location,
296 "locations do not match"
297 );
298 self.location
299 .flow_state()
300 .borrow_mut()
301 .push_root(HydroRoot::CycleSink {
302 ident,
303 input: Box::new(self.ir_node.into_inner()),
304 op_metadata: HydroIrOpMetadata::new(),
305 });
306 }
307}
308
309impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
310where
311 T: Clone,
312 L: Location<'a>,
313{
314 fn clone(&self) -> Self {
315 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
316 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
317 *self.ir_node.borrow_mut() = HydroNode::Tee {
318 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
319 metadata: self.location.new_node_metadata(Self::collection_kind()),
320 };
321 }
322
323 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
324 Stream {
325 location: self.location.clone(),
326 ir_node: HydroNode::Tee {
327 inner: TeeNode(inner.0.clone()),
328 metadata: metadata.clone(),
329 }
330 .into(),
331 _phantom: PhantomData,
332 }
333 } else {
334 unreachable!()
335 }
336 }
337}
338
339impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
340where
341 L: Location<'a>,
342{
343 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
344 debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
345 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
346
347 Stream {
348 location,
349 ir_node: RefCell::new(ir_node),
350 _phantom: PhantomData,
351 }
352 }
353
354 /// Returns the [`Location`] where this stream is being materialized.
355 pub fn location(&self) -> &L {
356 &self.location
357 }
358
359 pub(crate) fn collection_kind() -> CollectionKind {
360 CollectionKind::Stream {
361 bound: B::BOUND_KIND,
362 order: O::ORDERING_KIND,
363 retry: R::RETRIES_KIND,
364 element_type: stageleft::quote_type::<T>().into(),
365 }
366 }
367
368 /// Produces a stream based on invoking `f` on each element.
369 /// If you do not want to modify the stream and instead only want to view
370 /// each item use [`Stream::inspect`] instead.
371 ///
372 /// # Example
373 /// ```rust
374 /// # use hydro_lang::prelude::*;
375 /// # use futures::StreamExt;
376 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
377 /// let words = process.source_iter(q!(vec!["hello", "world"]));
378 /// words.map(q!(|x| x.to_uppercase()))
379 /// # }, |mut stream| async move {
380 /// # for w in vec!["HELLO", "WORLD"] {
381 /// # assert_eq!(stream.next().await.unwrap(), w);
382 /// # }
383 /// # }));
384 /// ```
385 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
386 where
387 F: Fn(T) -> U + 'a,
388 {
389 let f = f.splice_fn1_ctx(&self.location).into();
390 Stream::new(
391 self.location.clone(),
392 HydroNode::Map {
393 f,
394 input: Box::new(self.ir_node.into_inner()),
395 metadata: self
396 .location
397 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
398 },
399 )
400 }
401
402 /// For each item `i` in the input stream, transform `i` using `f` and then treat the
403 /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
404 /// for the output type `U` must produce items in a **deterministic** order.
405 ///
406 /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
407 /// not deterministic, use [`Stream::flat_map_unordered`] instead.
408 ///
409 /// # Example
410 /// ```rust
411 /// # use hydro_lang::prelude::*;
412 /// # use futures::StreamExt;
413 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
414 /// process
415 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
416 /// .flat_map_ordered(q!(|x| x))
417 /// # }, |mut stream| async move {
418 /// // 1, 2, 3, 4
419 /// # for w in (1..5) {
420 /// # assert_eq!(stream.next().await.unwrap(), w);
421 /// # }
422 /// # }));
423 /// ```
424 pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
425 where
426 I: IntoIterator<Item = U>,
427 F: Fn(T) -> I + 'a,
428 {
429 let f = f.splice_fn1_ctx(&self.location).into();
430 Stream::new(
431 self.location.clone(),
432 HydroNode::FlatMap {
433 f,
434 input: Box::new(self.ir_node.into_inner()),
435 metadata: self
436 .location
437 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
438 },
439 )
440 }
441
442 /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
443 /// for the output type `U` to produce items in any order.
444 ///
445 /// # Example
446 /// ```rust
447 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
448 /// # use futures::StreamExt;
449 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
450 /// process
451 /// .source_iter(q!(vec![
452 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
453 /// std::collections::HashSet::from_iter(vec![3, 4]),
454 /// ]))
455 /// .flat_map_unordered(q!(|x| x))
456 /// # }, |mut stream| async move {
457 /// // 1, 2, 3, 4, but in no particular order
458 /// # let mut results = Vec::new();
459 /// # for w in (1..5) {
460 /// # results.push(stream.next().await.unwrap());
461 /// # }
462 /// # results.sort();
463 /// # assert_eq!(results, vec![1, 2, 3, 4]);
464 /// # }));
465 /// ```
466 pub fn flat_map_unordered<U, I, F>(
467 self,
468 f: impl IntoQuotedMut<'a, F, L>,
469 ) -> Stream<U, L, B, NoOrder, R>
470 where
471 I: IntoIterator<Item = U>,
472 F: Fn(T) -> I + 'a,
473 {
474 let f = f.splice_fn1_ctx(&self.location).into();
475 Stream::new(
476 self.location.clone(),
477 HydroNode::FlatMap {
478 f,
479 input: Box::new(self.ir_node.into_inner()),
480 metadata: self
481 .location
482 .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
483 },
484 )
485 }
486
487 /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
488 /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
489 ///
490 /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
491 /// not deterministic, use [`Stream::flatten_unordered`] instead.
492 ///
493 /// ```rust
494 /// # use hydro_lang::prelude::*;
495 /// # use futures::StreamExt;
496 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
497 /// process
498 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
499 /// .flatten_ordered()
500 /// # }, |mut stream| async move {
501 /// // 1, 2, 3, 4
502 /// # for w in (1..5) {
503 /// # assert_eq!(stream.next().await.unwrap(), w);
504 /// # }
505 /// # }));
506 /// ```
507 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
508 where
509 T: IntoIterator<Item = U>,
510 {
511 self.flat_map_ordered(q!(|d| d))
512 }
513
514 /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
515 /// for the element type `T` to produce items in any order.
516 ///
517 /// # Example
518 /// ```rust
519 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
520 /// # use futures::StreamExt;
521 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
522 /// process
523 /// .source_iter(q!(vec![
524 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
525 /// std::collections::HashSet::from_iter(vec![3, 4]),
526 /// ]))
527 /// .flatten_unordered()
528 /// # }, |mut stream| async move {
529 /// // 1, 2, 3, 4, but in no particular order
530 /// # let mut results = Vec::new();
531 /// # for w in (1..5) {
532 /// # results.push(stream.next().await.unwrap());
533 /// # }
534 /// # results.sort();
535 /// # assert_eq!(results, vec![1, 2, 3, 4]);
536 /// # }));
537 /// ```
538 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
539 where
540 T: IntoIterator<Item = U>,
541 {
542 self.flat_map_unordered(q!(|d| d))
543 }
544
545 /// Creates a stream containing only the elements of the input stream that satisfy a predicate
546 /// `f`, preserving the order of the elements.
547 ///
548 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
549 /// not modify or take ownership of the values. If you need to modify the values while filtering
550 /// use [`Stream::filter_map`] instead.
551 ///
552 /// # Example
553 /// ```rust
554 /// # use hydro_lang::prelude::*;
555 /// # use futures::StreamExt;
556 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
557 /// process
558 /// .source_iter(q!(vec![1, 2, 3, 4]))
559 /// .filter(q!(|&x| x > 2))
560 /// # }, |mut stream| async move {
561 /// // 3, 4
562 /// # for w in (3..5) {
563 /// # assert_eq!(stream.next().await.unwrap(), w);
564 /// # }
565 /// # }));
566 /// ```
567 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
568 where
569 F: Fn(&T) -> bool + 'a,
570 {
571 let f = f.splice_fn1_borrow_ctx(&self.location).into();
572 Stream::new(
573 self.location.clone(),
574 HydroNode::Filter {
575 f,
576 input: Box::new(self.ir_node.into_inner()),
577 metadata: self.location.new_node_metadata(Self::collection_kind()),
578 },
579 )
580 }
581
582 /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
583 ///
584 /// # Example
585 /// ```rust
586 /// # use hydro_lang::prelude::*;
587 /// # use futures::StreamExt;
588 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
589 /// process
590 /// .source_iter(q!(vec!["1", "hello", "world", "2"]))
591 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
592 /// # }, |mut stream| async move {
593 /// // 1, 2
594 /// # for w in (1..3) {
595 /// # assert_eq!(stream.next().await.unwrap(), w);
596 /// # }
597 /// # }));
598 /// ```
599 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
600 where
601 F: Fn(T) -> Option<U> + 'a,
602 {
603 let f = f.splice_fn1_ctx(&self.location).into();
604 Stream::new(
605 self.location.clone(),
606 HydroNode::FilterMap {
607 f,
608 input: Box::new(self.ir_node.into_inner()),
609 metadata: self
610 .location
611 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
612 },
613 )
614 }
615
616 /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
617 /// where `x` is the final value of `other`, a bounded [`Singleton`].
618 ///
619 /// # Example
620 /// ```rust
621 /// # use hydro_lang::prelude::*;
622 /// # use futures::StreamExt;
623 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
624 /// let tick = process.tick();
625 /// let batch = process
626 /// .source_iter(q!(vec![1, 2, 3, 4]))
627 /// .batch(&tick, nondet!(/** test */));
628 /// let count = batch.clone().count(); // `count()` returns a singleton
629 /// batch.cross_singleton(count).all_ticks()
630 /// # }, |mut stream| async move {
631 /// // (1, 4), (2, 4), (3, 4), (4, 4)
632 /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
633 /// # assert_eq!(stream.next().await.unwrap(), w);
634 /// # }
635 /// # }));
636 /// ```
637 pub fn cross_singleton<O2>(
638 self,
639 other: impl Into<Optional<O2, L, Bounded>>,
640 ) -> Stream<(T, O2), L, B, O, R>
641 where
642 O2: Clone,
643 {
644 let other: Optional<O2, L, Bounded> = other.into();
645 check_matching_location(&self.location, &other.location);
646
647 Stream::new(
648 self.location.clone(),
649 HydroNode::CrossSingleton {
650 left: Box::new(self.ir_node.into_inner()),
651 right: Box::new(other.ir_node.into_inner()),
652 metadata: self
653 .location
654 .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
655 },
656 )
657 }
658
659 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
660 ///
661 /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
662 /// leader of a cluster.
663 ///
664 /// # Example
665 /// ```rust
666 /// # use hydro_lang::prelude::*;
667 /// # use futures::StreamExt;
668 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
669 /// let tick = process.tick();
670 /// // ticks are lazy by default, forces the second tick to run
671 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
672 ///
673 /// let batch_first_tick = process
674 /// .source_iter(q!(vec![1, 2, 3, 4]))
675 /// .batch(&tick, nondet!(/** test */));
676 /// let batch_second_tick = process
677 /// .source_iter(q!(vec![5, 6, 7, 8]))
678 /// .batch(&tick, nondet!(/** test */))
679 /// .defer_tick(); // appears on the second tick
680 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
681 /// batch_first_tick.chain(batch_second_tick)
682 /// .filter_if_some(some_on_first_tick)
683 /// .all_ticks()
684 /// # }, |mut stream| async move {
685 /// // [1, 2, 3, 4]
686 /// # for w in vec![1, 2, 3, 4] {
687 /// # assert_eq!(stream.next().await.unwrap(), w);
688 /// # }
689 /// # }));
690 /// ```
691 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
692 self.cross_singleton(signal.map(q!(|_u| ())))
693 .map(q!(|(d, _signal)| d))
694 }
695
696 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
697 ///
698 /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
699 /// some local state.
700 ///
701 /// # Example
702 /// ```rust
703 /// # use hydro_lang::prelude::*;
704 /// # use futures::StreamExt;
705 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
706 /// let tick = process.tick();
707 /// // ticks are lazy by default, forces the second tick to run
708 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
709 ///
710 /// let batch_first_tick = process
711 /// .source_iter(q!(vec![1, 2, 3, 4]))
712 /// .batch(&tick, nondet!(/** test */));
713 /// let batch_second_tick = process
714 /// .source_iter(q!(vec![5, 6, 7, 8]))
715 /// .batch(&tick, nondet!(/** test */))
716 /// .defer_tick(); // appears on the second tick
717 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
718 /// batch_first_tick.chain(batch_second_tick)
719 /// .filter_if_none(some_on_first_tick)
720 /// .all_ticks()
721 /// # }, |mut stream| async move {
722 /// // [5, 6, 7, 8]
723 /// # for w in vec![5, 6, 7, 8] {
724 /// # assert_eq!(stream.next().await.unwrap(), w);
725 /// # }
726 /// # }));
727 /// ```
728 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
729 self.filter_if_some(
730 other
731 .map(q!(|_| ()))
732 .into_singleton()
733 .filter(q!(|o| o.is_none())),
734 )
735 }
736
737 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
738 /// tupled pairs in a non-deterministic order.
739 ///
740 /// # Example
741 /// ```rust
742 /// # use hydro_lang::prelude::*;
743 /// # use std::collections::HashSet;
744 /// # use futures::StreamExt;
745 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
746 /// let tick = process.tick();
747 /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
748 /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
749 /// stream1.cross_product(stream2)
750 /// # }, |mut stream| async move {
751 /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
752 /// # stream.map(|i| assert!(expected.contains(&i)));
753 /// # }));
754 /// ```
755 pub fn cross_product<T2, O2: Ordering>(
756 self,
757 other: Stream<T2, L, B, O2, R>,
758 ) -> Stream<(T, T2), L, B, NoOrder, R>
759 where
760 T: Clone,
761 T2: Clone,
762 {
763 check_matching_location(&self.location, &other.location);
764
765 Stream::new(
766 self.location.clone(),
767 HydroNode::CrossProduct {
768 left: Box::new(self.ir_node.into_inner()),
769 right: Box::new(other.ir_node.into_inner()),
770 metadata: self
771 .location
772 .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
773 },
774 )
775 }
776
777 /// Takes one stream as input and filters out any duplicate occurrences. The output
778 /// contains all unique values from the input.
779 ///
780 /// # Example
781 /// ```rust
782 /// # use hydro_lang::prelude::*;
783 /// # use futures::StreamExt;
784 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
785 /// let tick = process.tick();
786 /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
787 /// # }, |mut stream| async move {
788 /// # for w in vec![1, 2, 3, 4] {
789 /// # assert_eq!(stream.next().await.unwrap(), w);
790 /// # }
791 /// # }));
792 /// ```
793 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
794 where
795 T: Eq + Hash,
796 {
797 Stream::new(
798 self.location.clone(),
799 HydroNode::Unique {
800 input: Box::new(self.ir_node.into_inner()),
801 metadata: self
802 .location
803 .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
804 },
805 )
806 }
807
808 /// Outputs everything in this stream that is *not* contained in the `other` stream.
809 ///
810 /// The `other` stream must be [`Bounded`], since this function will wait until
811 /// all its elements are available before producing any output.
812 /// # Example
813 /// ```rust
814 /// # use hydro_lang::prelude::*;
815 /// # use futures::StreamExt;
816 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
817 /// let tick = process.tick();
818 /// let stream = process
819 /// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
820 /// .batch(&tick, nondet!(/** test */));
821 /// let batch = process
822 /// .source_iter(q!(vec![1, 2]))
823 /// .batch(&tick, nondet!(/** test */));
824 /// stream.filter_not_in(batch).all_ticks()
825 /// # }, |mut stream| async move {
826 /// # for w in vec![3, 4] {
827 /// # assert_eq!(stream.next().await.unwrap(), w);
828 /// # }
829 /// # }));
830 /// ```
831 pub fn filter_not_in<O2: Ordering>(self, other: Stream<T, L, Bounded, O2, R>) -> Self
832 where
833 T: Eq + Hash,
834 {
835 check_matching_location(&self.location, &other.location);
836
837 Stream::new(
838 self.location.clone(),
839 HydroNode::Difference {
840 pos: Box::new(self.ir_node.into_inner()),
841 neg: Box::new(other.ir_node.into_inner()),
842 metadata: self
843 .location
844 .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
845 },
846 )
847 }
848
849 /// An operator which allows you to "inspect" each element of a stream without
850 /// modifying it. The closure `f` is called on a reference to each item. This is
851 /// mainly useful for debugging, and should not be used to generate side-effects.
852 ///
853 /// # Example
854 /// ```rust
855 /// # use hydro_lang::prelude::*;
856 /// # use futures::StreamExt;
857 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
858 /// let nums = process.source_iter(q!(vec![1, 2]));
859 /// // prints "1 * 10 = 10" and "2 * 10 = 20"
860 /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
861 /// # }, |mut stream| async move {
862 /// # for w in vec![1, 2] {
863 /// # assert_eq!(stream.next().await.unwrap(), w);
864 /// # }
865 /// # }));
866 /// ```
867 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
868 where
869 F: Fn(&T) + 'a,
870 {
871 let f = f.splice_fn1_borrow_ctx(&self.location).into();
872
873 Stream::new(
874 self.location.clone(),
875 HydroNode::Inspect {
876 f,
877 input: Box::new(self.ir_node.into_inner()),
878 metadata: self.location.new_node_metadata(Self::collection_kind()),
879 },
880 )
881 }
882
883 /// An operator which allows you to "name" a `HydroNode`.
884 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
885 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
886 {
887 let mut node = self.ir_node.borrow_mut();
888 let metadata = node.metadata_mut();
889 metadata.tag = Some(name.to_string());
890 }
891 self
892 }
893
894 /// Explicitly "casts" the stream to a type with a different ordering
895 /// guarantee. Useful in unsafe code where the ordering cannot be proven
896 /// by the type-system.
897 ///
898 /// # Non-Determinism
899 /// This function is used as an escape hatch, and any mistakes in the
900 /// provided ordering guarantee will propagate into the guarantees
901 /// for the rest of the program.
902 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
903 if O::ORDERING_KIND == O2::ORDERING_KIND {
904 Stream::new(self.location, self.ir_node.into_inner())
905 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
906 // We can always weaken the ordering guarantee
907 Stream::new(
908 self.location.clone(),
909 HydroNode::Cast {
910 inner: Box::new(self.ir_node.into_inner()),
911 metadata: self
912 .location
913 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
914 },
915 )
916 } else {
917 Stream::new(
918 self.location.clone(),
919 HydroNode::ObserveNonDet {
920 inner: Box::new(self.ir_node.into_inner()),
921 trusted: false,
922 metadata: self
923 .location
924 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
925 },
926 )
927 }
928 }
929
930 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
931 // is not observable
932 fn assume_ordering_trusted<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
933 if O::ORDERING_KIND == O2::ORDERING_KIND {
934 Stream::new(self.location, self.ir_node.into_inner())
935 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
936 // We can always weaken the ordering guarantee
937 Stream::new(
938 self.location.clone(),
939 HydroNode::Cast {
940 inner: Box::new(self.ir_node.into_inner()),
941 metadata: self
942 .location
943 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
944 },
945 )
946 } else {
947 Stream::new(
948 self.location.clone(),
949 HydroNode::ObserveNonDet {
950 inner: Box::new(self.ir_node.into_inner()),
951 trusted: true,
952 metadata: self
953 .location
954 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
955 },
956 )
957 }
958 }
959
960 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
961 /// which is always safe because that is the weakest possible guarantee.
962 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
963 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
964 self.assume_ordering::<NoOrder>(nondet)
965 }
966
967 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
968 /// enforcing that `O2` is weaker than the input ordering guarantee.
969 pub fn weaken_ordering<O2: Ordering + MinOrder<O, Min = O2>>(self) -> Stream<T, L, B, O2, R> {
970 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
971 self.assume_ordering::<O2>(nondet)
972 }
973
974 /// Explicitly "casts" the stream to a type with a different retries
975 /// guarantee. Useful in unsafe code where the lack of retries cannot
976 /// be proven by the type-system.
977 ///
978 /// # Non-Determinism
979 /// This function is used as an escape hatch, and any mistakes in the
980 /// provided retries guarantee will propagate into the guarantees
981 /// for the rest of the program.
982 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
983 if R::RETRIES_KIND == R2::RETRIES_KIND {
984 Stream::new(self.location, self.ir_node.into_inner())
985 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
986 // We can always weaken the retries guarantee
987 Stream::new(
988 self.location.clone(),
989 HydroNode::Cast {
990 inner: Box::new(self.ir_node.into_inner()),
991 metadata: self
992 .location
993 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
994 },
995 )
996 } else {
997 Stream::new(
998 self.location.clone(),
999 HydroNode::ObserveNonDet {
1000 inner: Box::new(self.ir_node.into_inner()),
1001 trusted: false,
1002 metadata: self
1003 .location
1004 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1005 },
1006 )
1007 }
1008 }
1009
1010 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1011 // is not observable
1012 fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1013 if R::RETRIES_KIND == R2::RETRIES_KIND {
1014 Stream::new(self.location, self.ir_node.into_inner())
1015 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1016 // We can always weaken the retries guarantee
1017 Stream::new(
1018 self.location.clone(),
1019 HydroNode::Cast {
1020 inner: Box::new(self.ir_node.into_inner()),
1021 metadata: self
1022 .location
1023 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1024 },
1025 )
1026 } else {
1027 Stream::new(
1028 self.location.clone(),
1029 HydroNode::ObserveNonDet {
1030 inner: Box::new(self.ir_node.into_inner()),
1031 trusted: true,
1032 metadata: self
1033 .location
1034 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1035 },
1036 )
1037 }
1038 }
1039
1040 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
1041 /// which is always safe because that is the weakest possible guarantee.
1042 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
1043 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1044 self.assume_retries::<AtLeastOnce>(nondet)
1045 }
1046
1047 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
1048 /// enforcing that `R2` is weaker than the input retries guarantee.
1049 pub fn weaken_retries<R2: Retries + MinRetries<R, Min = R2>>(self) -> Stream<T, L, B, O, R2> {
1050 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1051 self.assume_retries::<R2>(nondet)
1052 }
1053}
1054
1055impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1056where
1057 L: Location<'a>,
1058{
1059 /// Given a stream with [`ExactlyOnce`] retry guarantees, weakens it to an arbitrary guarantee
1060 /// `R2`, which is safe because all guarantees are equal to or weaker than [`ExactlyOnce`]
1061 pub fn weaker_retries<R2: Retries>(self) -> Stream<T, L, B, O, R2> {
1062 self.assume_retries(
1063 nondet!(/** any retry ordering is the same or weaker than ExactlyOnce */),
1064 )
1065 }
1066}
1067
1068impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
1069where
1070 L: Location<'a>,
1071{
1072 /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
1073 ///
1074 /// # Example
1075 /// ```rust
1076 /// # use hydro_lang::prelude::*;
1077 /// # use futures::StreamExt;
1078 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1079 /// process.source_iter(q!(&[1, 2, 3])).cloned()
1080 /// # }, |mut stream| async move {
1081 /// // 1, 2, 3
1082 /// # for w in vec![1, 2, 3] {
1083 /// # assert_eq!(stream.next().await.unwrap(), w);
1084 /// # }
1085 /// # }));
1086 /// ```
1087 pub fn cloned(self) -> Stream<T, L, B, O, R>
1088 where
1089 T: Clone,
1090 {
1091 self.map(q!(|d| d.clone()))
1092 }
1093}
1094
1095impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
1096where
1097 L: Location<'a>,
1098{
1099 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1100 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1101 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1102 ///
1103 /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
1104 /// and there may be duplicates.
1105 ///
1106 /// # Example
1107 /// ```rust
1108 /// # use hydro_lang::prelude::*;
1109 /// # use futures::StreamExt;
1110 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1111 /// let tick = process.tick();
1112 /// let bools = process.source_iter(q!(vec![false, true, false]));
1113 /// let batch = bools.batch(&tick, nondet!(/** test */));
1114 /// batch
1115 /// .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1116 /// .all_ticks()
1117 /// # }, |mut stream| async move {
1118 /// // true
1119 /// # assert_eq!(stream.next().await.unwrap(), true);
1120 /// # }));
1121 /// ```
1122 pub fn fold_commutative_idempotent<A, I, F>(
1123 self,
1124 init: impl IntoQuotedMut<'a, I, L>,
1125 comb: impl IntoQuotedMut<'a, F, L>,
1126 ) -> Singleton<A, L, B>
1127 where
1128 I: Fn() -> A + 'a,
1129 F: Fn(&mut A, T),
1130 {
1131 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1132 self.assume_ordering(nondet)
1133 .assume_retries(nondet)
1134 .fold(init, comb)
1135 }
1136
1137 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1138 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1139 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1140 /// reference, so that it can be modified in place.
1141 ///
1142 /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
1143 /// and there may be duplicates.
1144 ///
1145 /// # Example
1146 /// ```rust
1147 /// # use hydro_lang::prelude::*;
1148 /// # use futures::StreamExt;
1149 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1150 /// let tick = process.tick();
1151 /// let bools = process.source_iter(q!(vec![false, true, false]));
1152 /// let batch = bools.batch(&tick, nondet!(/** test */));
1153 /// batch
1154 /// .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1155 /// .all_ticks()
1156 /// # }, |mut stream| async move {
1157 /// // true
1158 /// # assert_eq!(stream.next().await.unwrap(), true);
1159 /// # }));
1160 /// ```
1161 pub fn reduce_commutative_idempotent<F>(
1162 self,
1163 comb: impl IntoQuotedMut<'a, F, L>,
1164 ) -> Optional<T, L, B>
1165 where
1166 F: Fn(&mut T, T) + 'a,
1167 {
1168 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1169 self.assume_ordering(nondet)
1170 .assume_retries(nondet)
1171 .reduce(comb)
1172 }
1173
1174 // only for internal APIs that have been carefully vetted, will eventually be removed once we
1175 // have algebraic verification of these properties
1176 fn reduce_commutative_idempotent_trusted<F>(
1177 self,
1178 comb: impl IntoQuotedMut<'a, F, L>,
1179 ) -> Optional<T, L, B>
1180 where
1181 F: Fn(&mut T, T) + 'a,
1182 {
1183 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1184
1185 let ordered = if B::BOUNDED {
1186 self.assume_ordering_trusted(nondet)
1187 } else {
1188 self.assume_ordering(nondet) // if unbounded, ordering affects intermediate states
1189 };
1190
1191 ordered
1192 .assume_retries_trusted(nondet) // retries never affect intermediate states
1193 .reduce(comb)
1194 }
1195
1196 /// Computes the maximum element in the stream as an [`Optional`], which
1197 /// will be empty until the first element in the input arrives.
1198 ///
1199 /// # Example
1200 /// ```rust
1201 /// # use hydro_lang::prelude::*;
1202 /// # use futures::StreamExt;
1203 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1204 /// let tick = process.tick();
1205 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1206 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1207 /// batch.max().all_ticks()
1208 /// # }, |mut stream| async move {
1209 /// // 4
1210 /// # assert_eq!(stream.next().await.unwrap(), 4);
1211 /// # }));
1212 /// ```
1213 pub fn max(self) -> Optional<T, L, B>
1214 where
1215 T: Ord,
1216 {
1217 self.reduce_commutative_idempotent_trusted(q!(|curr, new| {
1218 if new > *curr {
1219 *curr = new;
1220 }
1221 }))
1222 }
1223
1224 /// Computes the minimum element in the stream as an [`Optional`], which
1225 /// will be empty until the first element in the input arrives.
1226 ///
1227 /// # Example
1228 /// ```rust
1229 /// # use hydro_lang::prelude::*;
1230 /// # use futures::StreamExt;
1231 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1232 /// let tick = process.tick();
1233 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1234 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1235 /// batch.min().all_ticks()
1236 /// # }, |mut stream| async move {
1237 /// // 1
1238 /// # assert_eq!(stream.next().await.unwrap(), 1);
1239 /// # }));
1240 /// ```
1241 pub fn min(self) -> Optional<T, L, B>
1242 where
1243 T: Ord,
1244 {
1245 self.reduce_commutative_idempotent_trusted(q!(|curr, new| {
1246 if new < *curr {
1247 *curr = new;
1248 }
1249 }))
1250 }
1251}
1252
1253impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1254where
1255 L: Location<'a>,
1256{
1257 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1258 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1259 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1260 ///
1261 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1262 ///
1263 /// # Example
1264 /// ```rust
1265 /// # use hydro_lang::prelude::*;
1266 /// # use futures::StreamExt;
1267 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1268 /// let tick = process.tick();
1269 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1270 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1271 /// batch
1272 /// .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1273 /// .all_ticks()
1274 /// # }, |mut stream| async move {
1275 /// // 10
1276 /// # assert_eq!(stream.next().await.unwrap(), 10);
1277 /// # }));
1278 /// ```
1279 pub fn fold_commutative<A, I, F>(
1280 self,
1281 init: impl IntoQuotedMut<'a, I, L>,
1282 comb: impl IntoQuotedMut<'a, F, L>,
1283 ) -> Singleton<A, L, B>
1284 where
1285 I: Fn() -> A + 'a,
1286 F: Fn(&mut A, T),
1287 {
1288 let nondet = nondet!(/** the combinator function is commutative */);
1289 self.assume_ordering(nondet).fold(init, comb)
1290 }
1291
1292 /// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream,
1293 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1294 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1295 /// reference, so that it can be modified in place.
1296 ///
1297 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1298 ///
1299 /// # Example
1300 /// ```rust
1301 /// # use hydro_lang::prelude::*;
1302 /// # use futures::StreamExt;
1303 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1304 /// let tick = process.tick();
1305 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1306 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1307 /// batch
1308 /// .reduce_commutative(q!(|curr, new| *curr += new))
1309 /// .all_ticks()
1310 /// # }, |mut stream| async move {
1311 /// // 10
1312 /// # assert_eq!(stream.next().await.unwrap(), 10);
1313 /// # }));
1314 /// ```
1315 pub fn reduce_commutative<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1316 where
1317 F: Fn(&mut T, T) + 'a,
1318 {
1319 let nondet = nondet!(/** the combinator function is commutative */);
1320 self.assume_ordering(nondet).reduce(comb)
1321 }
1322
1323 /// Computes the number of elements in the stream as a [`Singleton`].
1324 ///
1325 /// # Example
1326 /// ```rust
1327 /// # use hydro_lang::prelude::*;
1328 /// # use futures::StreamExt;
1329 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1330 /// let tick = process.tick();
1331 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1332 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1333 /// batch.count().all_ticks()
1334 /// # }, |mut stream| async move {
1335 /// // 4
1336 /// # assert_eq!(stream.next().await.unwrap(), 4);
1337 /// # }));
1338 /// ```
1339 pub fn count(self) -> Singleton<usize, L, B> {
1340 self.assume_ordering_trusted(nondet!(
1341 /// Order does not affect eventual count, and also does not affect intermediate states.
1342 ))
1343 .fold(q!(|| 0usize), q!(|count, _| *count += 1))
1344 }
1345}
1346
1347impl<'a, T, L, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R>
1348where
1349 L: Location<'a>,
1350{
1351 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1352 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1353 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1354 ///
1355 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1356 ///
1357 /// # Example
1358 /// ```rust
1359 /// # use hydro_lang::prelude::*;
1360 /// # use futures::StreamExt;
1361 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1362 /// let tick = process.tick();
1363 /// let bools = process.source_iter(q!(vec![false, true, false]));
1364 /// let batch = bools.batch(&tick, nondet!(/** test */));
1365 /// batch
1366 /// .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1367 /// .all_ticks()
1368 /// # }, |mut stream| async move {
1369 /// // true
1370 /// # assert_eq!(stream.next().await.unwrap(), true);
1371 /// # }));
1372 /// ```
1373 pub fn fold_idempotent<A, I, F>(
1374 self,
1375 init: impl IntoQuotedMut<'a, I, L>,
1376 comb: impl IntoQuotedMut<'a, F, L>,
1377 ) -> Singleton<A, L, B>
1378 where
1379 I: Fn() -> A + 'a,
1380 F: Fn(&mut A, T),
1381 {
1382 let nondet = nondet!(/** the combinator function is idempotent */);
1383 self.assume_retries(nondet).fold(init, comb)
1384 }
1385
1386 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1387 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1388 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1389 /// reference, so that it can be modified in place.
1390 ///
1391 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1392 ///
1393 /// # Example
1394 /// ```rust
1395 /// # use hydro_lang::prelude::*;
1396 /// # use futures::StreamExt;
1397 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1398 /// let tick = process.tick();
1399 /// let bools = process.source_iter(q!(vec![false, true, false]));
1400 /// let batch = bools.batch(&tick, nondet!(/** test */));
1401 /// batch.reduce_idempotent(q!(|acc, x| *acc |= x)).all_ticks()
1402 /// # }, |mut stream| async move {
1403 /// // true
1404 /// # assert_eq!(stream.next().await.unwrap(), true);
1405 /// # }));
1406 /// ```
1407 pub fn reduce_idempotent<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1408 where
1409 F: Fn(&mut T, T) + 'a,
1410 {
1411 let nondet = nondet!(/** the combinator function is idempotent */);
1412 self.assume_retries(nondet).reduce(comb)
1413 }
1414
1415 /// Computes the first element in the stream as an [`Optional`], which
1416 /// will be empty until the first element in the input arrives.
1417 ///
1418 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1419 /// re-ordering of elements may cause the first element to change.
1420 ///
1421 /// # Example
1422 /// ```rust
1423 /// # use hydro_lang::prelude::*;
1424 /// # use futures::StreamExt;
1425 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1426 /// let tick = process.tick();
1427 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1428 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1429 /// batch.first().all_ticks()
1430 /// # }, |mut stream| async move {
1431 /// // 1
1432 /// # assert_eq!(stream.next().await.unwrap(), 1);
1433 /// # }));
1434 /// ```
1435 pub fn first(self) -> Optional<T, L, B> {
1436 self.reduce_idempotent(q!(|_, _| {}))
1437 }
1438
1439 /// Computes the last element in the stream as an [`Optional`], which
1440 /// will be empty until an element in the input arrives.
1441 ///
1442 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1443 /// re-ordering of elements may cause the last element to change.
1444 ///
1445 /// # Example
1446 /// ```rust
1447 /// # use hydro_lang::prelude::*;
1448 /// # use futures::StreamExt;
1449 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1450 /// let tick = process.tick();
1451 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1452 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1453 /// batch.last().all_ticks()
1454 /// # }, |mut stream| async move {
1455 /// // 4
1456 /// # assert_eq!(stream.next().await.unwrap(), 4);
1457 /// # }));
1458 /// ```
1459 pub fn last(self) -> Optional<T, L, B> {
1460 self.reduce_idempotent(q!(|curr, new| *curr = new))
1461 }
1462}
1463
1464impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
1465where
1466 L: Location<'a>,
1467{
1468 /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1469 ///
1470 /// # Example
1471 /// ```rust
1472 /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1473 /// # use futures::StreamExt;
1474 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, TotalOrder, ExactlyOnce>(|process| {
1475 /// let tick = process.tick();
1476 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1477 /// numbers.enumerate()
1478 /// # }, |mut stream| async move {
1479 /// // (0, 1), (1, 2), (2, 3), (3, 4)
1480 /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1481 /// # assert_eq!(stream.next().await.unwrap(), w);
1482 /// # }
1483 /// # }));
1484 /// ```
1485 pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder, ExactlyOnce> {
1486 Stream::new(
1487 self.location.clone(),
1488 HydroNode::Enumerate {
1489 input: Box::new(self.ir_node.into_inner()),
1490 metadata: self.location.new_node_metadata(Stream::<
1491 (usize, T),
1492 L,
1493 B,
1494 TotalOrder,
1495 ExactlyOnce,
1496 >::collection_kind()),
1497 },
1498 )
1499 }
1500
1501 /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1502 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1503 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1504 ///
1505 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1506 /// to depend on the order of elements in the stream.
1507 ///
1508 /// # Example
1509 /// ```rust
1510 /// # use hydro_lang::prelude::*;
1511 /// # use futures::StreamExt;
1512 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1513 /// let tick = process.tick();
1514 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1515 /// let batch = words.batch(&tick, nondet!(/** test */));
1516 /// batch
1517 /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1518 /// .all_ticks()
1519 /// # }, |mut stream| async move {
1520 /// // "HELLOWORLD"
1521 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1522 /// # }));
1523 /// ```
1524 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
1525 self,
1526 init: impl IntoQuotedMut<'a, I, L>,
1527 comb: impl IntoQuotedMut<'a, F, L>,
1528 ) -> Singleton<A, L, B> {
1529 let init = init.splice_fn0_ctx(&self.location).into();
1530 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1531
1532 let core = HydroNode::Fold {
1533 init,
1534 acc: comb,
1535 input: Box::new(self.ir_node.into_inner()),
1536 metadata: self
1537 .location
1538 .new_node_metadata(Singleton::<A, L, B>::collection_kind()),
1539 };
1540
1541 Singleton::new(self.location, core)
1542 }
1543
1544 /// Collects all the elements of this stream into a single [`Vec`] element.
1545 ///
1546 /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1547 /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1548 /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1549 /// the vector at an arbitrary point in time.
1550 ///
1551 /// # Example
1552 /// ```rust
1553 /// # use hydro_lang::prelude::*;
1554 /// # use futures::StreamExt;
1555 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1556 /// let tick = process.tick();
1557 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1558 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1559 /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1560 /// # }, |mut stream| async move {
1561 /// // [ vec![1, 2, 3, 4] ]
1562 /// # for w in vec![vec![1, 2, 3, 4]] {
1563 /// # assert_eq!(stream.next().await.unwrap(), w);
1564 /// # }
1565 /// # }));
1566 /// ```
1567 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B> {
1568 self.fold(
1569 q!(|| vec![]),
1570 q!(|acc, v| {
1571 acc.push(v);
1572 }),
1573 )
1574 }
1575
1576 /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1577 /// and emitting each intermediate result.
1578 ///
1579 /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1580 /// containing all intermediate accumulated values. The scan operation can also terminate early
1581 /// by returning `None`.
1582 ///
1583 /// The function takes a mutable reference to the accumulator and the current element, and returns
1584 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1585 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1586 ///
1587 /// # Examples
1588 ///
1589 /// Basic usage - running sum:
1590 /// ```rust
1591 /// # use hydro_lang::prelude::*;
1592 /// # use futures::StreamExt;
1593 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1594 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1595 /// q!(|| 0),
1596 /// q!(|acc, x| {
1597 /// *acc += x;
1598 /// Some(*acc)
1599 /// }),
1600 /// )
1601 /// # }, |mut stream| async move {
1602 /// // Output: 1, 3, 6, 10
1603 /// # for w in vec![1, 3, 6, 10] {
1604 /// # assert_eq!(stream.next().await.unwrap(), w);
1605 /// # }
1606 /// # }));
1607 /// ```
1608 ///
1609 /// Early termination example:
1610 /// ```rust
1611 /// # use hydro_lang::prelude::*;
1612 /// # use futures::StreamExt;
1613 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1614 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1615 /// q!(|| 1),
1616 /// q!(|state, x| {
1617 /// *state = *state * x;
1618 /// if *state > 6 {
1619 /// None // Terminate the stream
1620 /// } else {
1621 /// Some(-*state)
1622 /// }
1623 /// }),
1624 /// )
1625 /// # }, |mut stream| async move {
1626 /// // Output: -1, -2, -6
1627 /// # for w in vec![-1, -2, -6] {
1628 /// # assert_eq!(stream.next().await.unwrap(), w);
1629 /// # }
1630 /// # }));
1631 /// ```
1632 pub fn scan<A, U, I, F>(
1633 self,
1634 init: impl IntoQuotedMut<'a, I, L>,
1635 f: impl IntoQuotedMut<'a, F, L>,
1636 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1637 where
1638 I: Fn() -> A + 'a,
1639 F: Fn(&mut A, T) -> Option<U> + 'a,
1640 {
1641 let init = init.splice_fn0_ctx(&self.location).into();
1642 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1643
1644 Stream::new(
1645 self.location.clone(),
1646 HydroNode::Scan {
1647 init,
1648 acc: f,
1649 input: Box::new(self.ir_node.into_inner()),
1650 metadata: self.location.new_node_metadata(
1651 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1652 ),
1653 },
1654 )
1655 }
1656
1657 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1658 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1659 /// until the first element in the input arrives.
1660 ///
1661 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1662 /// to depend on the order of elements in the stream.
1663 ///
1664 /// # Example
1665 /// ```rust
1666 /// # use hydro_lang::prelude::*;
1667 /// # use futures::StreamExt;
1668 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1669 /// let tick = process.tick();
1670 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1671 /// let batch = words.batch(&tick, nondet!(/** test */));
1672 /// batch
1673 /// .map(q!(|x| x.to_string()))
1674 /// .reduce(q!(|curr, new| curr.push_str(&new)))
1675 /// .all_ticks()
1676 /// # }, |mut stream| async move {
1677 /// // "HELLOWORLD"
1678 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1679 /// # }));
1680 /// ```
1681 pub fn reduce<F: Fn(&mut T, T) + 'a>(
1682 self,
1683 comb: impl IntoQuotedMut<'a, F, L>,
1684 ) -> Optional<T, L, B> {
1685 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1686 let core = HydroNode::Reduce {
1687 f,
1688 input: Box::new(self.ir_node.into_inner()),
1689 metadata: self
1690 .location
1691 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1692 };
1693
1694 Optional::new(self.location, core)
1695 }
1696}
1697
1698impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
1699 /// Produces a new stream that interleaves the elements of the two input streams.
1700 /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1701 ///
1702 /// Currently, both input streams must be [`Unbounded`]. When the streams are
1703 /// [`Bounded`], you can use [`Stream::chain`] instead.
1704 ///
1705 /// # Example
1706 /// ```rust
1707 /// # use hydro_lang::prelude::*;
1708 /// # use futures::StreamExt;
1709 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1710 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1711 /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1712 /// # }, |mut stream| async move {
1713 /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1714 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1715 /// # assert_eq!(stream.next().await.unwrap(), w);
1716 /// # }
1717 /// # }));
1718 /// ```
1719 pub fn interleave<O2: Ordering, R2: Retries>(
1720 self,
1721 other: Stream<T, L, Unbounded, O2, R2>,
1722 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1723 where
1724 R: MinRetries<R2>,
1725 {
1726 Stream::new(
1727 self.location.clone(),
1728 HydroNode::Chain {
1729 first: Box::new(self.ir_node.into_inner()),
1730 second: Box::new(other.ir_node.into_inner()),
1731 metadata: self.location.new_node_metadata(Stream::<
1732 T,
1733 L,
1734 Unbounded,
1735 NoOrder,
1736 <R as MinRetries<R2>>::Min,
1737 >::collection_kind()),
1738 },
1739 )
1740 }
1741}
1742
1743impl<'a, T, L, O: Ordering, R: Retries> Stream<T, L, Bounded, O, R>
1744where
1745 L: Location<'a>,
1746{
1747 /// Produces a new stream that emits the input elements in sorted order.
1748 ///
1749 /// The input stream can have any ordering guarantee, but the output stream
1750 /// will have a [`TotalOrder`] guarantee. This operator will block until all
1751 /// elements in the input stream are available, so it requires the input stream
1752 /// to be [`Bounded`].
1753 ///
1754 /// # Example
1755 /// ```rust
1756 /// # use hydro_lang::prelude::*;
1757 /// # use futures::StreamExt;
1758 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1759 /// let tick = process.tick();
1760 /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1761 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1762 /// batch.sort().all_ticks()
1763 /// # }, |mut stream| async move {
1764 /// // 1, 2, 3, 4
1765 /// # for w in (1..5) {
1766 /// # assert_eq!(stream.next().await.unwrap(), w);
1767 /// # }
1768 /// # }));
1769 /// ```
1770 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1771 where
1772 T: Ord,
1773 {
1774 Stream::new(
1775 self.location.clone(),
1776 HydroNode::Sort {
1777 input: Box::new(self.ir_node.into_inner()),
1778 metadata: self
1779 .location
1780 .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
1781 },
1782 )
1783 }
1784
1785 /// Produces a new stream that first emits the elements of the `self` stream,
1786 /// and then emits the elements of the `other` stream. The output stream has
1787 /// a [`TotalOrder`] guarantee if and only if both input streams have a
1788 /// [`TotalOrder`] guarantee.
1789 ///
1790 /// Currently, both input streams must be [`Bounded`]. This operator will block
1791 /// on the first stream until all its elements are available. In a future version,
1792 /// we will relax the requirement on the `other` stream.
1793 ///
1794 /// # Example
1795 /// ```rust
1796 /// # use hydro_lang::prelude::*;
1797 /// # use futures::StreamExt;
1798 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1799 /// let tick = process.tick();
1800 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1801 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1802 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1803 /// # }, |mut stream| async move {
1804 /// // 2, 3, 4, 5, 1, 2, 3, 4
1805 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1806 /// # assert_eq!(stream.next().await.unwrap(), w);
1807 /// # }
1808 /// # }));
1809 /// ```
1810 pub fn chain<O2: Ordering, R2: Retries>(
1811 self,
1812 other: Stream<T, L, Bounded, O2, R2>,
1813 ) -> Stream<T, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1814 where
1815 O: MinOrder<O2>,
1816 R: MinRetries<R2>,
1817 {
1818 check_matching_location(&self.location, &other.location);
1819
1820 Stream::new(
1821 self.location.clone(),
1822 HydroNode::Chain {
1823 first: Box::new(self.ir_node.into_inner()),
1824 second: Box::new(other.ir_node.into_inner()),
1825 metadata: self.location.new_node_metadata(Stream::<
1826 T,
1827 L,
1828 Bounded,
1829 <O as MinOrder<O2>>::Min,
1830 <R as MinRetries<R2>>::Min,
1831 >::collection_kind()),
1832 },
1833 )
1834 }
1835
1836 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
1837 /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
1838 /// because this is compiled into a nested loop.
1839 pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
1840 self,
1841 other: Stream<T2, L, Bounded, O2, R>,
1842 ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
1843 where
1844 T: Clone,
1845 T2: Clone,
1846 {
1847 check_matching_location(&self.location, &other.location);
1848
1849 Stream::new(
1850 self.location.clone(),
1851 HydroNode::CrossProduct {
1852 left: Box::new(self.ir_node.into_inner()),
1853 right: Box::new(other.ir_node.into_inner()),
1854 metadata: self.location.new_node_metadata(Stream::<
1855 (T, T2),
1856 L,
1857 Bounded,
1858 <O2 as MinOrder<O>>::Min,
1859 R,
1860 >::collection_kind()),
1861 },
1862 )
1863 }
1864
1865 /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
1866 /// `self` used as the values for *each* key.
1867 ///
1868 /// This is helpful when "broadcasting" a set of values so that all the keys have the same
1869 /// values. For example, it can be used to send the same set of elements to several cluster
1870 /// members, if the membership information is available as a [`KeyedSingleton`].
1871 ///
1872 /// # Example
1873 /// ```rust
1874 /// # use hydro_lang::prelude::*;
1875 /// # use futures::StreamExt;
1876 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1877 /// # let tick = process.tick();
1878 /// let keyed_singleton = // { 1: (), 2: () }
1879 /// # process
1880 /// # .source_iter(q!(vec![(1, ()), (2, ())]))
1881 /// # .into_keyed()
1882 /// # .batch(&tick, nondet!(/** test */))
1883 /// # .first();
1884 /// let stream = // [ "a", "b" ]
1885 /// # process
1886 /// # .source_iter(q!(vec!["a".to_string(), "b".to_string()]))
1887 /// # .batch(&tick, nondet!(/** test */));
1888 /// stream.repeat_with_keys(keyed_singleton)
1889 /// # .entries().all_ticks()
1890 /// # }, |mut stream| async move {
1891 /// // { 1: ["a", "b" ], 2: ["a", "b"] }
1892 /// # let mut results = Vec::new();
1893 /// # for _ in 0..4 {
1894 /// # results.push(stream.next().await.unwrap());
1895 /// # }
1896 /// # results.sort();
1897 /// # assert_eq!(results, vec![(1, "a".to_string()), (1, "b".to_string()), (2, "a".to_string()), (2, "b".to_string())]);
1898 /// # }));
1899 /// ```
1900 pub fn repeat_with_keys<K, V2>(
1901 self,
1902 keys: KeyedSingleton<K, V2, L, Bounded>,
1903 ) -> KeyedStream<K, T, L, Bounded, O, R>
1904 where
1905 K: Clone,
1906 T: Clone,
1907 {
1908 keys.keys()
1909 .weaken_retries()
1910 .assume_ordering_trusted::<TotalOrder>(
1911 nondet!(/** keyed stream does not depend on ordering of keys */),
1912 )
1913 .cross_product_nested_loop(self)
1914 .into_keyed()
1915 }
1916}
1917
1918impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
1919where
1920 L: Location<'a>,
1921{
1922 #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
1923 /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
1924 /// by equi-joining the two streams on the key attribute `K`.
1925 ///
1926 /// # Example
1927 /// ```rust
1928 /// # use hydro_lang::prelude::*;
1929 /// # use std::collections::HashSet;
1930 /// # use futures::StreamExt;
1931 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1932 /// let tick = process.tick();
1933 /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
1934 /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
1935 /// stream1.join(stream2)
1936 /// # }, |mut stream| async move {
1937 /// // (1, ('a', 'x')), (2, ('b', 'y'))
1938 /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
1939 /// # stream.map(|i| assert!(expected.contains(&i)));
1940 /// # }));
1941 pub fn join<V2, O2: Ordering, R2: Retries>(
1942 self,
1943 n: Stream<(K, V2), L, B, O2, R2>,
1944 ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
1945 where
1946 K: Eq + Hash,
1947 R: MinRetries<R2>,
1948 {
1949 check_matching_location(&self.location, &n.location);
1950
1951 Stream::new(
1952 self.location.clone(),
1953 HydroNode::Join {
1954 left: Box::new(self.ir_node.into_inner()),
1955 right: Box::new(n.ir_node.into_inner()),
1956 metadata: self.location.new_node_metadata(Stream::<
1957 (K, (V1, V2)),
1958 L,
1959 B,
1960 NoOrder,
1961 <R as MinRetries<R2>>::Min,
1962 >::collection_kind()),
1963 },
1964 )
1965 }
1966
1967 /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
1968 /// computes the anti-join of the items in the input -- i.e. returns
1969 /// unique items in the first input that do not have a matching key
1970 /// in the second input.
1971 ///
1972 /// # Example
1973 /// ```rust
1974 /// # use hydro_lang::prelude::*;
1975 /// # use futures::StreamExt;
1976 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1977 /// let tick = process.tick();
1978 /// let stream = process
1979 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1980 /// .batch(&tick, nondet!(/** test */));
1981 /// let batch = process
1982 /// .source_iter(q!(vec![1, 2]))
1983 /// .batch(&tick, nondet!(/** test */));
1984 /// stream.anti_join(batch).all_ticks()
1985 /// # }, |mut stream| async move {
1986 /// # for w in vec![(3, 'c'), (4, 'd')] {
1987 /// # assert_eq!(stream.next().await.unwrap(), w);
1988 /// # }
1989 /// # }));
1990 pub fn anti_join<O2: Ordering, R2: Retries>(
1991 self,
1992 n: Stream<K, L, Bounded, O2, R2>,
1993 ) -> Stream<(K, V1), L, B, O, R>
1994 where
1995 K: Eq + Hash,
1996 {
1997 check_matching_location(&self.location, &n.location);
1998
1999 Stream::new(
2000 self.location.clone(),
2001 HydroNode::AntiJoin {
2002 pos: Box::new(self.ir_node.into_inner()),
2003 neg: Box::new(n.ir_node.into_inner()),
2004 metadata: self
2005 .location
2006 .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2007 },
2008 )
2009 }
2010}
2011
2012impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2013 Stream<(K, V), L, B, O, R>
2014{
2015 /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2016 /// is used as the key and the second element is added to the entries associated with that key.
2017 ///
2018 /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2019 /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2020 /// performing grouped aggregations, but also for more precise ordering guarantees such as
2021 /// total ordering _within_ each group but no ordering _across_ groups.
2022 ///
2023 /// # Example
2024 /// ```rust
2025 /// # use hydro_lang::prelude::*;
2026 /// # use futures::StreamExt;
2027 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2028 /// process
2029 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2030 /// .into_keyed()
2031 /// # .entries()
2032 /// # }, |mut stream| async move {
2033 /// // { 1: [2, 3], 2: [4] }
2034 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2035 /// # assert_eq!(stream.next().await.unwrap(), w);
2036 /// # }
2037 /// # }));
2038 /// ```
2039 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2040 KeyedStream::new(
2041 self.location.clone(),
2042 HydroNode::Cast {
2043 inner: Box::new(self.ir_node.into_inner()),
2044 metadata: self
2045 .location
2046 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2047 },
2048 )
2049 }
2050}
2051
2052impl<'a, K, V, L> Stream<(K, V), Tick<L>, Bounded, TotalOrder, ExactlyOnce>
2053where
2054 K: Eq + Hash,
2055 L: Location<'a>,
2056{
2057 #[deprecated = "use .into_keyed().fold(...) instead"]
2058 /// A special case of [`Stream::fold`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2059 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2060 /// in the second element are accumulated via the `comb` closure.
2061 ///
2062 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
2063 /// to depend on the order of elements in the stream.
2064 ///
2065 /// If the input and output value types are the same and do not require initialization then use
2066 /// [`Stream::reduce_keyed`].
2067 ///
2068 /// # Example
2069 /// ```rust
2070 /// # use hydro_lang::prelude::*;
2071 /// # use futures::StreamExt;
2072 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2073 /// let tick = process.tick();
2074 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2075 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2076 /// batch
2077 /// .fold_keyed(q!(|| 0), q!(|acc, x| *acc += x))
2078 /// .all_ticks()
2079 /// # }, |mut stream| async move {
2080 /// // (1, 5), (2, 7)
2081 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2082 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2083 /// # }));
2084 /// ```
2085 pub fn fold_keyed<A, I, F>(
2086 self,
2087 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2088 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2089 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2090 where
2091 I: Fn() -> A + 'a,
2092 F: Fn(&mut A, V) + 'a,
2093 {
2094 self.into_keyed().fold(init, comb).entries()
2095 }
2096
2097 #[deprecated = "use .into_keyed().reduce(...) instead"]
2098 /// A special case of [`Stream::reduce`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2099 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2100 /// in the second element are accumulated via the `comb` closure.
2101 ///
2102 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
2103 /// to depend on the order of elements in the stream.
2104 ///
2105 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed`].
2106 ///
2107 /// # Example
2108 /// ```rust
2109 /// # use hydro_lang::prelude::*;
2110 /// # use futures::StreamExt;
2111 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2112 /// let tick = process.tick();
2113 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2114 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2115 /// batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks()
2116 /// # }, |mut stream| async move {
2117 /// // (1, 5), (2, 7)
2118 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2119 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2120 /// # }));
2121 /// ```
2122 pub fn reduce_keyed<F>(
2123 self,
2124 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2125 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2126 where
2127 F: Fn(&mut V, V) + 'a,
2128 {
2129 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
2130
2131 Stream::new(
2132 self.location.clone(),
2133 HydroNode::ReduceKeyed {
2134 f,
2135 input: Box::new(self.ir_node.into_inner()),
2136 metadata: self.location.new_node_metadata(Stream::<
2137 (K, V),
2138 Tick<L>,
2139 Bounded,
2140 NoOrder,
2141 ExactlyOnce,
2142 >::collection_kind()),
2143 },
2144 )
2145 }
2146}
2147
2148impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2149where
2150 K: Eq + Hash,
2151 L: Location<'a>,
2152{
2153 #[deprecated = "use .into_keyed().fold_commutative_idempotent(...) instead"]
2154 /// A special case of [`Stream::fold_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2155 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2156 /// in the second element are accumulated via the `comb` closure.
2157 ///
2158 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2159 /// as there may be non-deterministic duplicates.
2160 ///
2161 /// If the input and output value types are the same and do not require initialization then use
2162 /// [`Stream::reduce_keyed_commutative_idempotent`].
2163 ///
2164 /// # Example
2165 /// ```rust
2166 /// # use hydro_lang::prelude::*;
2167 /// # use futures::StreamExt;
2168 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2169 /// let tick = process.tick();
2170 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2171 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2172 /// batch
2173 /// .fold_keyed_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2174 /// .all_ticks()
2175 /// # }, |mut stream| async move {
2176 /// // (1, false), (2, true)
2177 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2178 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2179 /// # }));
2180 /// ```
2181 pub fn fold_keyed_commutative_idempotent<A, I, F>(
2182 self,
2183 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2184 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2185 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2186 where
2187 I: Fn() -> A + 'a,
2188 F: Fn(&mut A, V) + 'a,
2189 {
2190 self.into_keyed()
2191 .fold_commutative_idempotent(init, comb)
2192 .entries()
2193 }
2194
2195 /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2196 /// # Example
2197 /// ```rust
2198 /// # use hydro_lang::prelude::*;
2199 /// # use futures::StreamExt;
2200 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2201 /// let tick = process.tick();
2202 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2203 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2204 /// batch.keys().all_ticks()
2205 /// # }, |mut stream| async move {
2206 /// // 1, 2
2207 /// # assert_eq!(stream.next().await.unwrap(), 1);
2208 /// # assert_eq!(stream.next().await.unwrap(), 2);
2209 /// # }));
2210 /// ```
2211 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2212 self.into_keyed()
2213 .fold_commutative_idempotent(q!(|| ()), q!(|_, _| {}))
2214 .keys()
2215 }
2216
2217 #[deprecated = "use .into_keyed().reduce_commutative_idempotent(...) instead"]
2218 /// A special case of [`Stream::reduce_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2219 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2220 /// in the second element are accumulated via the `comb` closure.
2221 ///
2222 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2223 /// as there may be non-deterministic duplicates.
2224 ///
2225 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative_idempotent`].
2226 ///
2227 /// # Example
2228 /// ```rust
2229 /// # use hydro_lang::prelude::*;
2230 /// # use futures::StreamExt;
2231 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2232 /// let tick = process.tick();
2233 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2234 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2235 /// batch
2236 /// .reduce_keyed_commutative_idempotent(q!(|acc, x| *acc |= x))
2237 /// .all_ticks()
2238 /// # }, |mut stream| async move {
2239 /// // (1, false), (2, true)
2240 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2241 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2242 /// # }));
2243 /// ```
2244 pub fn reduce_keyed_commutative_idempotent<F>(
2245 self,
2246 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2247 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2248 where
2249 F: Fn(&mut V, V) + 'a,
2250 {
2251 self.into_keyed()
2252 .reduce_commutative_idempotent(comb)
2253 .entries()
2254 }
2255}
2256
2257impl<'a, K, V, L, O: Ordering> Stream<(K, V), Tick<L>, Bounded, O, ExactlyOnce>
2258where
2259 K: Eq + Hash,
2260 L: Location<'a>,
2261{
2262 #[deprecated = "use .into_keyed().fold_commutative(...) instead"]
2263 /// A special case of [`Stream::fold_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2264 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2265 /// in the second element are accumulated via the `comb` closure.
2266 ///
2267 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2268 ///
2269 /// If the input and output value types are the same and do not require initialization then use
2270 /// [`Stream::reduce_keyed_commutative`].
2271 ///
2272 /// # Example
2273 /// ```rust
2274 /// # use hydro_lang::prelude::*;
2275 /// # use futures::StreamExt;
2276 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2277 /// let tick = process.tick();
2278 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2279 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2280 /// batch
2281 /// .fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x))
2282 /// .all_ticks()
2283 /// # }, |mut stream| async move {
2284 /// // (1, 5), (2, 7)
2285 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2286 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2287 /// # }));
2288 /// ```
2289 pub fn fold_keyed_commutative<A, I, F>(
2290 self,
2291 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2292 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2293 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2294 where
2295 I: Fn() -> A + 'a,
2296 F: Fn(&mut A, V) + 'a,
2297 {
2298 self.into_keyed().fold_commutative(init, comb).entries()
2299 }
2300
2301 #[deprecated = "use .into_keyed().reduce_commutative(...) instead"]
2302 /// A special case of [`Stream::reduce_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2303 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2304 /// in the second element are accumulated via the `comb` closure.
2305 ///
2306 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2307 ///
2308 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative`].
2309 ///
2310 /// # Example
2311 /// ```rust
2312 /// # use hydro_lang::prelude::*;
2313 /// # use futures::StreamExt;
2314 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2315 /// let tick = process.tick();
2316 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2317 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2318 /// batch
2319 /// .reduce_keyed_commutative(q!(|acc, x| *acc += x))
2320 /// .all_ticks()
2321 /// # }, |mut stream| async move {
2322 /// // (1, 5), (2, 7)
2323 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2324 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2325 /// # }));
2326 /// ```
2327 pub fn reduce_keyed_commutative<F>(
2328 self,
2329 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2330 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2331 where
2332 F: Fn(&mut V, V) + 'a,
2333 {
2334 self.into_keyed().reduce_commutative(comb).entries()
2335 }
2336}
2337
2338impl<'a, K, V, L, R: Retries> Stream<(K, V), Tick<L>, Bounded, TotalOrder, R>
2339where
2340 K: Eq + Hash,
2341 L: Location<'a>,
2342{
2343 #[deprecated = "use .into_keyed().fold_idempotent(...) instead"]
2344 /// A special case of [`Stream::fold_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2345 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2346 /// in the second element are accumulated via the `comb` closure.
2347 ///
2348 /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
2349 ///
2350 /// If the input and output value types are the same and do not require initialization then use
2351 /// [`Stream::reduce_keyed_idempotent`].
2352 ///
2353 /// # Example
2354 /// ```rust
2355 /// # use hydro_lang::prelude::*;
2356 /// # use futures::StreamExt;
2357 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2358 /// let tick = process.tick();
2359 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2360 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2361 /// batch
2362 /// .fold_keyed_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2363 /// .all_ticks()
2364 /// # }, |mut stream| async move {
2365 /// // (1, false), (2, true)
2366 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2367 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2368 /// # }));
2369 /// ```
2370 pub fn fold_keyed_idempotent<A, I, F>(
2371 self,
2372 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2373 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2374 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2375 where
2376 I: Fn() -> A + 'a,
2377 F: Fn(&mut A, V) + 'a,
2378 {
2379 self.into_keyed().fold_idempotent(init, comb).entries()
2380 }
2381
2382 #[deprecated = "use .into_keyed().reduce_idempotent(...) instead"]
2383 /// A special case of [`Stream::reduce_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2384 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2385 /// in the second element are accumulated via the `comb` closure.
2386 ///
2387 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
2388 ///
2389 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_idempotent`].
2390 ///
2391 /// # Example
2392 /// ```rust
2393 /// # use hydro_lang::prelude::*;
2394 /// # use futures::StreamExt;
2395 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2396 /// let tick = process.tick();
2397 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2398 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2399 /// batch
2400 /// .reduce_keyed_idempotent(q!(|acc, x| *acc |= x))
2401 /// .all_ticks()
2402 /// # }, |mut stream| async move {
2403 /// // (1, false), (2, true)
2404 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2405 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2406 /// # }));
2407 /// ```
2408 pub fn reduce_keyed_idempotent<F>(
2409 self,
2410 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2411 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2412 where
2413 F: Fn(&mut V, V) + 'a,
2414 {
2415 self.into_keyed().reduce_idempotent(comb).entries()
2416 }
2417}
2418
2419impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2420where
2421 L: Location<'a> + NoTick,
2422{
2423 /// Returns a stream corresponding to the latest batch of elements being atomically
2424 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2425 /// the order of the input.
2426 ///
2427 /// # Non-Determinism
2428 /// The batch boundaries are non-deterministic and may change across executions.
2429 pub fn batch_atomic(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2430 Stream::new(
2431 self.location.clone().tick,
2432 HydroNode::Batch {
2433 inner: Box::new(self.ir_node.into_inner()),
2434 metadata: self
2435 .location
2436 .tick
2437 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2438 },
2439 )
2440 }
2441
2442 /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2443 /// See [`Stream::atomic`] for more details.
2444 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2445 Stream::new(
2446 self.location.tick.l.clone(),
2447 HydroNode::EndAtomic {
2448 inner: Box::new(self.ir_node.into_inner()),
2449 metadata: self
2450 .location
2451 .tick
2452 .l
2453 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2454 },
2455 )
2456 }
2457}
2458
2459impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2460where
2461 L: Location<'a>,
2462{
2463 /// Shifts this stream into an atomic context, which guarantees that any downstream logic
2464 /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
2465 ///
2466 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2467 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2468 /// argument that declares where the stream will be atomically processed. Batching a stream into
2469 /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2470 /// [`Tick`] will introduce asynchrony.
2471 pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
2472 let out_location = Atomic { tick: tick.clone() };
2473 Stream::new(
2474 out_location.clone(),
2475 HydroNode::BeginAtomic {
2476 inner: Box::new(self.ir_node.into_inner()),
2477 metadata: out_location
2478 .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
2479 },
2480 )
2481 }
2482
2483 /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2484 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2485 /// the order of the input. The output stream will execute in the [`Tick`] that was
2486 /// used to create the atomic section.
2487 ///
2488 /// # Non-Determinism
2489 /// The batch boundaries are non-deterministic and may change across executions.
2490 pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2491 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2492 Stream::new(
2493 tick.clone(),
2494 HydroNode::Batch {
2495 inner: Box::new(self.ir_node.into_inner()),
2496 metadata: tick
2497 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2498 },
2499 )
2500 }
2501
2502 /// Given a time interval, returns a stream corresponding to samples taken from the
2503 /// stream roughly at that interval. The output will have elements in the same order
2504 /// as the input, but with arbitrary elements skipped between samples. There is also
2505 /// no guarantee on the exact timing of the samples.
2506 ///
2507 /// # Non-Determinism
2508 /// The output stream is non-deterministic in which elements are sampled, since this
2509 /// is controlled by a clock.
2510 pub fn sample_every(
2511 self,
2512 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
2513 nondet: NonDet,
2514 ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
2515 where
2516 L: NoTick + NoAtomic,
2517 {
2518 let samples = self.location.source_interval(interval, nondet);
2519
2520 let tick = self.location.tick();
2521 self.batch(&tick, nondet)
2522 .filter_if_some(samples.batch(&tick, nondet).first())
2523 .all_ticks()
2524 .weakest_retries()
2525 }
2526
2527 /// Given a timeout duration, returns an [`Optional`] which will have a value if the
2528 /// stream has not emitted a value since that duration.
2529 ///
2530 /// # Non-Determinism
2531 /// Timeout relies on non-deterministic sampling of the stream, so depending on when
2532 /// samples take place, timeouts may be non-deterministically generated or missed,
2533 /// and the notification of the timeout may be delayed as well. There is also no
2534 /// guarantee on how long the [`Optional`] will have a value after the timeout is
2535 /// detected based on when the next sample is taken.
2536 pub fn timeout(
2537 self,
2538 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
2539 nondet: NonDet,
2540 ) -> Optional<(), L, Unbounded>
2541 where
2542 L: NoTick + NoAtomic,
2543 {
2544 let tick = self.location.tick();
2545
2546 let latest_received = self.assume_retries(nondet).fold_commutative(
2547 q!(|| None),
2548 q!(|latest, _| {
2549 *latest = Some(Instant::now());
2550 }),
2551 );
2552
2553 latest_received
2554 .snapshot(&tick, nondet)
2555 .filter_map(q!(move |latest_received| {
2556 if let Some(latest_received) = latest_received {
2557 if Instant::now().duration_since(latest_received) > duration {
2558 Some(())
2559 } else {
2560 None
2561 }
2562 } else {
2563 Some(())
2564 }
2565 }))
2566 .latest()
2567 }
2568}
2569
2570impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2571where
2572 L: Location<'a> + NoTick + NoAtomic,
2573 F: Future<Output = T>,
2574{
2575 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2576 /// Future outputs are produced as available, regardless of input arrival order.
2577 ///
2578 /// # Example
2579 /// ```rust
2580 /// # use std::collections::HashSet;
2581 /// # use futures::StreamExt;
2582 /// # use hydro_lang::prelude::*;
2583 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2584 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2585 /// .map(q!(|x| async move {
2586 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2587 /// x
2588 /// }))
2589 /// .resolve_futures()
2590 /// # },
2591 /// # |mut stream| async move {
2592 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2593 /// # let mut output = HashSet::new();
2594 /// # for _ in 1..10 {
2595 /// # output.insert(stream.next().await.unwrap());
2596 /// # }
2597 /// # assert_eq!(
2598 /// # output,
2599 /// # HashSet::<i32>::from_iter(1..10)
2600 /// # );
2601 /// # },
2602 /// # ));
2603 pub fn resolve_futures(self) -> Stream<T, L, B, NoOrder, R> {
2604 Stream::new(
2605 self.location.clone(),
2606 HydroNode::ResolveFutures {
2607 input: Box::new(self.ir_node.into_inner()),
2608 metadata: self
2609 .location
2610 .new_node_metadata(Stream::<T, L, B, NoOrder, R>::collection_kind()),
2611 },
2612 )
2613 }
2614
2615 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2616 /// Future outputs are produced in the same order as the input stream.
2617 ///
2618 /// # Example
2619 /// ```rust
2620 /// # use std::collections::HashSet;
2621 /// # use futures::StreamExt;
2622 /// # use hydro_lang::prelude::*;
2623 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2624 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2625 /// .map(q!(|x| async move {
2626 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2627 /// x
2628 /// }))
2629 /// .resolve_futures_ordered()
2630 /// # },
2631 /// # |mut stream| async move {
2632 /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2633 /// # let mut output = Vec::new();
2634 /// # for _ in 1..10 {
2635 /// # output.push(stream.next().await.unwrap());
2636 /// # }
2637 /// # assert_eq!(
2638 /// # output,
2639 /// # vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2640 /// # );
2641 /// # },
2642 /// # ));
2643 pub fn resolve_futures_ordered(self) -> Stream<T, L, B, O, R> {
2644 Stream::new(
2645 self.location.clone(),
2646 HydroNode::ResolveFuturesOrdered {
2647 input: Box::new(self.ir_node.into_inner()),
2648 metadata: self
2649 .location
2650 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2651 },
2652 )
2653 }
2654}
2655
2656impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
2657where
2658 L: Location<'a> + NoTick,
2659{
2660 /// Executes the provided closure for every element in this stream.
2661 ///
2662 /// Because the closure may have side effects, the stream must have deterministic order
2663 /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
2664 /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
2665 /// [`Stream::assume_retries`] with an explanation for why this is the case.
2666 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
2667 let f = f.splice_fn1_ctx(&self.location).into();
2668 self.location
2669 .flow_state()
2670 .borrow_mut()
2671 .push_root(HydroRoot::ForEach {
2672 input: Box::new(self.ir_node.into_inner()),
2673 f,
2674 op_metadata: HydroIrOpMetadata::new(),
2675 });
2676 }
2677
2678 /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
2679 /// TCP socket to some other server. You should _not_ use this API for interacting with
2680 /// external clients, instead see [`Location::bidi_external_many_bytes`] and
2681 /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
2682 /// interaction with asynchronous sinks.
2683 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
2684 where
2685 S: 'a + futures::Sink<T> + Unpin,
2686 {
2687 self.location
2688 .flow_state()
2689 .borrow_mut()
2690 .push_root(HydroRoot::DestSink {
2691 sink: sink.splice_typed_ctx(&self.location).into(),
2692 input: Box::new(self.ir_node.into_inner()),
2693 op_metadata: HydroIrOpMetadata::new(),
2694 });
2695 }
2696}
2697
2698impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2699where
2700 L: Location<'a>,
2701{
2702 /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2703 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2704 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2705 Stream::new(
2706 self.location.outer().clone(),
2707 HydroNode::YieldConcat {
2708 inner: Box::new(self.ir_node.into_inner()),
2709 metadata: self
2710 .location
2711 .outer()
2712 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2713 },
2714 )
2715 }
2716
2717 /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2718 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2719 ///
2720 /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2721 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2722 /// stream's [`Tick`] context.
2723 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2724 let out_location = Atomic {
2725 tick: self.location.clone(),
2726 };
2727
2728 Stream::new(
2729 out_location.clone(),
2730 HydroNode::YieldConcat {
2731 inner: Box::new(self.ir_node.into_inner()),
2732 metadata: out_location
2733 .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2734 },
2735 )
2736 }
2737
2738 /// Accumulates the elements of this stream **across ticks** by concatenating them together.
2739 ///
2740 /// The output stream in tick T will contain the elements of the input at tick 0, 1, ..., up to
2741 /// and including tick T. This is useful for accumulating streaming inputs across ticks, but be
2742 /// careful when using this operator, as its memory usage will grow linearly over time since it
2743 /// must store its inputs indefinitely.
2744 ///
2745 /// # Example
2746 /// ```rust
2747 /// # use hydro_lang::prelude::*;
2748 /// # use futures::StreamExt;
2749 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2750 /// let tick = process.tick();
2751 /// // ticks are lazy by default, forces the second tick to run
2752 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2753 ///
2754 /// let batch_first_tick = process
2755 /// .source_iter(q!(vec![1, 2, 3, 4]))
2756 /// .batch(&tick, nondet!(/** test */));
2757 /// let batch_second_tick = process
2758 /// .source_iter(q!(vec![5, 6, 7, 8]))
2759 /// .batch(&tick, nondet!(/** test */))
2760 /// .defer_tick(); // appears on the second tick
2761 /// batch_first_tick.chain(batch_second_tick)
2762 /// .persist()
2763 /// .all_ticks()
2764 /// # }, |mut stream| async move {
2765 /// // [1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, ...]
2766 /// # for w in vec![1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8] {
2767 /// # assert_eq!(stream.next().await.unwrap(), w);
2768 /// # }
2769 /// # }));
2770 /// ```
2771 pub fn persist(self) -> Stream<T, Tick<L>, Bounded, O, R>
2772 where
2773 T: Clone,
2774 {
2775 Stream::new(
2776 self.location.clone(),
2777 HydroNode::Persist {
2778 inner: Box::new(self.ir_node.into_inner()),
2779 metadata: self
2780 .location
2781 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2782 },
2783 )
2784 }
2785
2786 /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2787 /// always has the elements of `self` at tick `T - 1`.
2788 ///
2789 /// At tick `0`, the output stream is empty, since there is no previous tick.
2790 ///
2791 /// This operator enables stateful iterative processing with ticks, by sending data from one
2792 /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2793 ///
2794 /// # Example
2795 /// ```rust
2796 /// # use hydro_lang::prelude::*;
2797 /// # use futures::StreamExt;
2798 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2799 /// let tick = process.tick();
2800 /// // ticks are lazy by default, forces the second tick to run
2801 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2802 ///
2803 /// let batch_first_tick = process
2804 /// .source_iter(q!(vec![1, 2, 3, 4]))
2805 /// .batch(&tick, nondet!(/** test */));
2806 /// let batch_second_tick = process
2807 /// .source_iter(q!(vec![0, 3, 4, 5, 6]))
2808 /// .batch(&tick, nondet!(/** test */))
2809 /// .defer_tick(); // appears on the second tick
2810 /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2811 ///
2812 /// changes_across_ticks.clone().filter_not_in(
2813 /// changes_across_ticks.defer_tick() // the elements from the previous tick
2814 /// ).all_ticks()
2815 /// # }, |mut stream| async move {
2816 /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2817 /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2818 /// # assert_eq!(stream.next().await.unwrap(), w);
2819 /// # }
2820 /// # }));
2821 /// ```
2822 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2823 Stream::new(
2824 self.location.clone(),
2825 HydroNode::DeferTick {
2826 input: Box::new(self.ir_node.into_inner()),
2827 metadata: self
2828 .location
2829 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2830 },
2831 )
2832 }
2833}
2834
2835#[cfg(test)]
2836mod tests {
2837 #[cfg(feature = "deploy")]
2838 use futures::{SinkExt, StreamExt};
2839 #[cfg(feature = "deploy")]
2840 use hydro_deploy::Deployment;
2841 #[cfg(feature = "deploy")]
2842 use serde::{Deserialize, Serialize};
2843 #[cfg(feature = "deploy")]
2844 use stageleft::q;
2845
2846 use crate::compile::builder::FlowBuilder;
2847 #[cfg(feature = "deploy")]
2848 use crate::live_collections::stream::ExactlyOnce;
2849 use crate::live_collections::stream::{NoOrder, TotalOrder};
2850 use crate::location::Location;
2851 use crate::nondet::nondet;
2852
2853 mod backtrace_chained_ops;
2854
2855 #[cfg(feature = "deploy")]
2856 struct P1 {}
2857 #[cfg(feature = "deploy")]
2858 struct P2 {}
2859
2860 #[cfg(feature = "deploy")]
2861 #[derive(Serialize, Deserialize, Debug)]
2862 struct SendOverNetwork {
2863 n: u32,
2864 }
2865
2866 #[cfg(feature = "deploy")]
2867 #[tokio::test]
2868 async fn first_ten_distributed() {
2869 let mut deployment = Deployment::new();
2870
2871 let flow = FlowBuilder::new();
2872 let first_node = flow.process::<P1>();
2873 let second_node = flow.process::<P2>();
2874 let external = flow.external::<P2>();
2875
2876 let numbers = first_node.source_iter(q!(0..10));
2877 let out_port = numbers
2878 .map(q!(|n| SendOverNetwork { n }))
2879 .send_bincode(&second_node)
2880 .send_bincode_external(&external);
2881
2882 let nodes = flow
2883 .with_process(&first_node, deployment.Localhost())
2884 .with_process(&second_node, deployment.Localhost())
2885 .with_external(&external, deployment.Localhost())
2886 .deploy(&mut deployment);
2887
2888 deployment.deploy().await.unwrap();
2889
2890 let mut external_out = nodes.connect(out_port).await;
2891
2892 deployment.start().await.unwrap();
2893
2894 for i in 0..10 {
2895 assert_eq!(external_out.next().await.unwrap().n, i);
2896 }
2897 }
2898
2899 #[cfg(feature = "deploy")]
2900 #[tokio::test]
2901 async fn first_cardinality() {
2902 let mut deployment = Deployment::new();
2903
2904 let flow = FlowBuilder::new();
2905 let node = flow.process::<()>();
2906 let external = flow.external::<()>();
2907
2908 let node_tick = node.tick();
2909 let count = node_tick
2910 .singleton(q!([1, 2, 3]))
2911 .into_stream()
2912 .flatten_ordered()
2913 .first()
2914 .into_stream()
2915 .count()
2916 .all_ticks()
2917 .send_bincode_external(&external);
2918
2919 let nodes = flow
2920 .with_process(&node, deployment.Localhost())
2921 .with_external(&external, deployment.Localhost())
2922 .deploy(&mut deployment);
2923
2924 deployment.deploy().await.unwrap();
2925
2926 let mut external_out = nodes.connect(count).await;
2927
2928 deployment.start().await.unwrap();
2929
2930 assert_eq!(external_out.next().await.unwrap(), 1);
2931 }
2932
2933 #[cfg(feature = "deploy")]
2934 #[tokio::test]
2935 async fn unbounded_reduce_remembers_state() {
2936 let mut deployment = Deployment::new();
2937
2938 let flow = FlowBuilder::new();
2939 let node = flow.process::<()>();
2940 let external = flow.external::<()>();
2941
2942 let (input_port, input) = node.source_external_bincode(&external);
2943 let out = input
2944 .reduce(q!(|acc, v| *acc += v))
2945 .sample_eager(nondet!(/** test */))
2946 .send_bincode_external(&external);
2947
2948 let nodes = flow
2949 .with_process(&node, deployment.Localhost())
2950 .with_external(&external, deployment.Localhost())
2951 .deploy(&mut deployment);
2952
2953 deployment.deploy().await.unwrap();
2954
2955 let mut external_in = nodes.connect(input_port).await;
2956 let mut external_out = nodes.connect(out).await;
2957
2958 deployment.start().await.unwrap();
2959
2960 external_in.send(1).await.unwrap();
2961 assert_eq!(external_out.next().await.unwrap(), 1);
2962
2963 external_in.send(2).await.unwrap();
2964 assert_eq!(external_out.next().await.unwrap(), 3);
2965 }
2966
2967 #[cfg(feature = "deploy")]
2968 #[tokio::test]
2969 async fn atomic_fold_replays_each_tick() {
2970 let mut deployment = Deployment::new();
2971
2972 let flow = FlowBuilder::new();
2973 let node = flow.process::<()>();
2974 let external = flow.external::<()>();
2975
2976 let (input_port, input) =
2977 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2978 let tick = node.tick();
2979
2980 let out = input
2981 .batch(&tick, nondet!(/** test */))
2982 .cross_singleton(
2983 node.source_iter(q!(vec![1, 2, 3]))
2984 .atomic(&tick)
2985 .fold(q!(|| 0), q!(|acc, v| *acc += v))
2986 .snapshot_atomic(nondet!(/** test */)),
2987 )
2988 .all_ticks()
2989 .send_bincode_external(&external);
2990
2991 let nodes = flow
2992 .with_process(&node, deployment.Localhost())
2993 .with_external(&external, deployment.Localhost())
2994 .deploy(&mut deployment);
2995
2996 deployment.deploy().await.unwrap();
2997
2998 let mut external_in = nodes.connect(input_port).await;
2999 let mut external_out = nodes.connect(out).await;
3000
3001 deployment.start().await.unwrap();
3002
3003 external_in.send(1).await.unwrap();
3004 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3005
3006 external_in.send(2).await.unwrap();
3007 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3008 }
3009
3010 #[cfg(feature = "deploy")]
3011 #[tokio::test]
3012 async fn unbounded_scan_remembers_state() {
3013 let mut deployment = Deployment::new();
3014
3015 let flow = FlowBuilder::new();
3016 let node = flow.process::<()>();
3017 let external = flow.external::<()>();
3018
3019 let (input_port, input) = node.source_external_bincode(&external);
3020 let out = input
3021 .scan(
3022 q!(|| 0),
3023 q!(|acc, v| {
3024 *acc += v;
3025 Some(*acc)
3026 }),
3027 )
3028 .send_bincode_external(&external);
3029
3030 let nodes = flow
3031 .with_process(&node, deployment.Localhost())
3032 .with_external(&external, deployment.Localhost())
3033 .deploy(&mut deployment);
3034
3035 deployment.deploy().await.unwrap();
3036
3037 let mut external_in = nodes.connect(input_port).await;
3038 let mut external_out = nodes.connect(out).await;
3039
3040 deployment.start().await.unwrap();
3041
3042 external_in.send(1).await.unwrap();
3043 assert_eq!(external_out.next().await.unwrap(), 1);
3044
3045 external_in.send(2).await.unwrap();
3046 assert_eq!(external_out.next().await.unwrap(), 3);
3047 }
3048
3049 #[cfg(feature = "deploy")]
3050 #[tokio::test]
3051 async fn unbounded_enumerate_remembers_state() {
3052 let mut deployment = Deployment::new();
3053
3054 let flow = FlowBuilder::new();
3055 let node = flow.process::<()>();
3056 let external = flow.external::<()>();
3057
3058 let (input_port, input) = node.source_external_bincode(&external);
3059 let out = input.enumerate().send_bincode_external(&external);
3060
3061 let nodes = flow
3062 .with_process(&node, deployment.Localhost())
3063 .with_external(&external, deployment.Localhost())
3064 .deploy(&mut deployment);
3065
3066 deployment.deploy().await.unwrap();
3067
3068 let mut external_in = nodes.connect(input_port).await;
3069 let mut external_out = nodes.connect(out).await;
3070
3071 deployment.start().await.unwrap();
3072
3073 external_in.send(1).await.unwrap();
3074 assert_eq!(external_out.next().await.unwrap(), (0, 1));
3075
3076 external_in.send(2).await.unwrap();
3077 assert_eq!(external_out.next().await.unwrap(), (1, 2));
3078 }
3079
3080 #[cfg(feature = "deploy")]
3081 #[tokio::test]
3082 async fn unbounded_unique_remembers_state() {
3083 let mut deployment = Deployment::new();
3084
3085 let flow = FlowBuilder::new();
3086 let node = flow.process::<()>();
3087 let external = flow.external::<()>();
3088
3089 let (input_port, input) =
3090 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3091 let out = input.unique().send_bincode_external(&external);
3092
3093 let nodes = flow
3094 .with_process(&node, deployment.Localhost())
3095 .with_external(&external, deployment.Localhost())
3096 .deploy(&mut deployment);
3097
3098 deployment.deploy().await.unwrap();
3099
3100 let mut external_in = nodes.connect(input_port).await;
3101 let mut external_out = nodes.connect(out).await;
3102
3103 deployment.start().await.unwrap();
3104
3105 external_in.send(1).await.unwrap();
3106 assert_eq!(external_out.next().await.unwrap(), 1);
3107
3108 external_in.send(2).await.unwrap();
3109 assert_eq!(external_out.next().await.unwrap(), 2);
3110
3111 external_in.send(1).await.unwrap();
3112 external_in.send(3).await.unwrap();
3113 assert_eq!(external_out.next().await.unwrap(), 3);
3114 }
3115
3116 #[test]
3117 #[should_panic]
3118 fn sim_batch_nondet_size() {
3119 let flow = FlowBuilder::new();
3120 let external = flow.external::<()>();
3121 let node = flow.process::<()>();
3122
3123 let (port, input) = node.source_external_bincode::<_, _, TotalOrder, _>(&external);
3124
3125 let tick = node.tick();
3126 let out_port = input
3127 .batch(&tick, nondet!(/** test */))
3128 .count()
3129 .all_ticks()
3130 .send_bincode_external(&external);
3131
3132 flow.sim().exhaustive(async |mut compiled| {
3133 let in_send = compiled.connect(&port);
3134 let mut out_recv = compiled.connect(&out_port);
3135 compiled.launch();
3136
3137 in_send.send(());
3138 in_send.send(());
3139 in_send.send(());
3140
3141 assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3142 });
3143 }
3144
3145 #[test]
3146 fn sim_batch_preserves_order() {
3147 let flow = FlowBuilder::new();
3148 let external = flow.external::<()>();
3149 let node = flow.process::<()>();
3150
3151 let (port, input) = node.source_external_bincode(&external);
3152
3153 let tick = node.tick();
3154 let out_port = input
3155 .batch(&tick, nondet!(/** test */))
3156 .all_ticks()
3157 .send_bincode_external(&external);
3158
3159 flow.sim().exhaustive(async |mut compiled| {
3160 let in_send = compiled.connect(&port);
3161 let out_recv = compiled.connect(&out_port);
3162 compiled.launch();
3163
3164 in_send.send(1);
3165 in_send.send(2);
3166 in_send.send(3);
3167
3168 out_recv.assert_yields_only([1, 2, 3]).await;
3169 });
3170 }
3171
3172 #[test]
3173 #[should_panic]
3174 fn sim_batch_unordered_shuffles() {
3175 let flow = FlowBuilder::new();
3176 let external = flow.external::<()>();
3177 let node = flow.process::<()>();
3178
3179 let (port, input) = node.source_external_bincode::<_, _, NoOrder, _>(&external);
3180
3181 let tick = node.tick();
3182 let batch = input.batch(&tick, nondet!(/** test */));
3183 let out_port = batch
3184 .clone()
3185 .min()
3186 .zip(batch.max())
3187 .all_ticks()
3188 .send_bincode_external(&external);
3189
3190 flow.sim().exhaustive(async |mut compiled| {
3191 let in_send = compiled.connect(&port);
3192 let out_recv = compiled.connect(&out_port);
3193 compiled.launch();
3194
3195 in_send.send_many_unordered([1, 2, 3]).unwrap();
3196
3197 if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3198 panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3199 }
3200 });
3201 }
3202
3203 #[test]
3204 fn sim_batch_unordered_shuffles_count() {
3205 let flow = FlowBuilder::new();
3206 let external = flow.external::<()>();
3207 let node = flow.process::<()>();
3208
3209 let (port, input) = node.source_external_bincode::<_, _, NoOrder, _>(&external);
3210
3211 let tick = node.tick();
3212 let batch = input.batch(&tick, nondet!(/** test */));
3213 let out_port = batch.all_ticks().send_bincode_external(&external);
3214
3215 let instance_count = flow.sim().exhaustive(async |mut compiled| {
3216 let in_send = compiled.connect(&port);
3217 let out_recv = compiled.connect(&out_port);
3218 compiled.launch();
3219
3220 in_send.send_many_unordered([1, 2, 3, 4]).unwrap();
3221 out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3222 });
3223
3224 assert_eq!(
3225 instance_count,
3226 75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3227 )
3228 }
3229
3230 #[test]
3231 #[ignore = "assume_ordering not yet supported on bounded collections"]
3232 fn sim_observe_order_batched_count() {
3233 let flow = FlowBuilder::new();
3234 let external = flow.external::<()>();
3235 let node = flow.process::<()>();
3236
3237 let (port, input) = node.source_external_bincode::<_, _, NoOrder, _>(&external);
3238
3239 let tick = node.tick();
3240 let batch = input.batch(&tick, nondet!(/** test */));
3241 let out_port = batch
3242 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3243 .all_ticks()
3244 .send_bincode_external(&external);
3245
3246 let instance_count = flow.sim().exhaustive(async |mut compiled| {
3247 let in_send = compiled.connect(&port);
3248 let out_recv = compiled.connect(&out_port);
3249 compiled.launch();
3250
3251 in_send.send_many_unordered([1, 2, 3, 4]).unwrap();
3252 let _ = out_recv.collect::<Vec<_>>().await;
3253 });
3254
3255 assert_eq!(
3256 instance_count,
3257 192 // 4! * 2^{4 - 1}
3258 )
3259 }
3260
3261 #[test]
3262 fn sim_unordered_count_instance_count() {
3263 let flow = FlowBuilder::new();
3264 let external = flow.external::<()>();
3265 let node = flow.process::<()>();
3266
3267 let (port, input) = node.source_external_bincode::<_, _, NoOrder, _>(&external);
3268
3269 let tick = node.tick();
3270 let out_port = input
3271 .count()
3272 .snapshot(&tick, nondet!(/** test */))
3273 .all_ticks()
3274 .send_bincode_external(&external);
3275
3276 let instance_count = flow.sim().exhaustive(async |mut compiled| {
3277 let in_send = compiled.connect(&port);
3278 let out_recv = compiled.connect(&out_port);
3279 compiled.launch();
3280
3281 in_send.send_many_unordered([1, 2, 3, 4]).unwrap();
3282 assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3283 });
3284
3285 assert_eq!(
3286 instance_count,
3287 16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3288 )
3289 }
3290}