hydro_lang/live_collections/stream/mod.rs
1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::KeyedStream;
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::ir::{
19 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
20};
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::forward_handle::{ForwardRef, TickCycle};
24#[cfg(stageleft_runtime)]
25use crate::location::dynamic::{DynLocation, LocationId};
26use crate::location::tick::{Atomic, DeferTick, NoAtomic};
27use crate::location::{Location, NoTick, Tick, check_matching_location};
28use crate::nondet::{NonDet, nondet};
29
30pub mod networking;
31
32/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
33#[sealed::sealed]
34pub trait Ordering:
35 MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
36{
37 /// The [`StreamOrder`] corresponding to this type.
38 const ORDERING_KIND: StreamOrder;
39}
40
41/// Marks the stream as being totally ordered, which means that there are
42/// no sources of non-determinism (other than intentional ones) that will
43/// affect the order of elements.
44pub enum TotalOrder {}
45
46#[sealed::sealed]
47impl Ordering for TotalOrder {
48 const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
49}
50
51/// Marks the stream as having no order, which means that the order of
52/// elements may be affected by non-determinism.
53///
54/// This restricts certain operators, such as `fold` and `reduce`, to only
55/// be used with commutative aggregation functions.
56pub enum NoOrder {}
57
58#[sealed::sealed]
59impl Ordering for NoOrder {
60 const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
61}
62
63/// Helper trait for determining the weakest of two orderings.
64#[sealed::sealed]
65pub trait MinOrder<Other: ?Sized> {
66 /// The weaker of the two orderings.
67 type Min: Ordering;
68}
69
70#[sealed::sealed]
71impl MinOrder<NoOrder> for TotalOrder {
72 type Min = NoOrder;
73}
74
75#[sealed::sealed]
76impl MinOrder<TotalOrder> for TotalOrder {
77 type Min = TotalOrder;
78}
79
80#[sealed::sealed]
81impl MinOrder<TotalOrder> for NoOrder {
82 type Min = NoOrder;
83}
84
85#[sealed::sealed]
86impl MinOrder<NoOrder> for NoOrder {
87 type Min = NoOrder;
88}
89
90/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
91#[sealed::sealed]
92pub trait Retries:
93 MinRetries<Self, Min = Self>
94 + MinRetries<ExactlyOnce, Min = Self>
95 + MinRetries<AtLeastOnce, Min = AtLeastOnce>
96{
97 /// The [`StreamRetry`] corresponding to this type.
98 const RETRIES_KIND: StreamRetry;
99}
100
101/// Marks the stream as having deterministic message cardinality, with no
102/// possibility of duplicates.
103pub enum ExactlyOnce {}
104
105#[sealed::sealed]
106impl Retries for ExactlyOnce {
107 const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
108}
109
110/// Marks the stream as having non-deterministic message cardinality, which
111/// means that duplicates may occur, but messages will not be dropped.
112pub enum AtLeastOnce {}
113
114#[sealed::sealed]
115impl Retries for AtLeastOnce {
116 const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
117}
118
119/// Helper trait for determining the weakest of two retry guarantees.
120#[sealed::sealed]
121pub trait MinRetries<Other: ?Sized> {
122 /// The weaker of the two retry guarantees.
123 type Min: Retries;
124}
125
126#[sealed::sealed]
127impl MinRetries<AtLeastOnce> for ExactlyOnce {
128 type Min = AtLeastOnce;
129}
130
131#[sealed::sealed]
132impl MinRetries<ExactlyOnce> for ExactlyOnce {
133 type Min = ExactlyOnce;
134}
135
136#[sealed::sealed]
137impl MinRetries<ExactlyOnce> for AtLeastOnce {
138 type Min = AtLeastOnce;
139}
140
141#[sealed::sealed]
142impl MinRetries<AtLeastOnce> for AtLeastOnce {
143 type Min = AtLeastOnce;
144}
145
146/// Streaming sequence of elements with type `Type`.
147///
148/// This live collection represents a growing sequence of elements, with new elements being
149/// asynchronously appended to the end of the sequence. This can be used to model the arrival
150/// of network input, such as API requests, or streaming ingestion.
151///
152/// By default, all streams have deterministic ordering and each element is materialized exactly
153/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
154/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
155/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
156///
157/// Type Parameters:
158/// - `Type`: the type of elements in the stream
159/// - `Loc`: the location where the stream is being materialized
160/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
161/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
162/// (default is [`TotalOrder`])
163/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
164/// [`AtLeastOnce`] (default is [`ExactlyOnce`])
165pub struct Stream<
166 Type,
167 Loc,
168 Bound: Boundedness,
169 Order: Ordering = TotalOrder,
170 Retry: Retries = ExactlyOnce,
171> {
172 pub(crate) location: Loc,
173 pub(crate) ir_node: RefCell<HydroNode>,
174
175 _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
176}
177
178impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
179 for Stream<T, L, Unbounded, O, R>
180where
181 L: Location<'a>,
182{
183 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
184 Stream {
185 location: stream.location,
186 ir_node: stream.ir_node,
187 _phantom: PhantomData,
188 }
189 }
190}
191
192impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
193 for Stream<T, L, B, NoOrder, R>
194where
195 L: Location<'a>,
196{
197 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
198 Stream {
199 location: stream.location,
200 ir_node: stream.ir_node,
201 _phantom: PhantomData,
202 }
203 }
204}
205
206impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
207 for Stream<T, L, B, O, AtLeastOnce>
208where
209 L: Location<'a>,
210{
211 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
212 Stream {
213 location: stream.location,
214 ir_node: stream.ir_node,
215 _phantom: PhantomData,
216 }
217 }
218}
219
220impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
221where
222 L: Location<'a>,
223{
224 fn defer_tick(self) -> Self {
225 Stream::defer_tick(self)
226 }
227}
228
229impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
230 for Stream<T, Tick<L>, Bounded, O, R>
231where
232 L: Location<'a>,
233{
234 type Location = Tick<L>;
235
236 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
237 Stream::new(
238 location.clone(),
239 HydroNode::CycleSource {
240 ident,
241 metadata: location.new_node_metadata(Self::collection_kind()),
242 },
243 )
244 }
245}
246
247impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
248 for Stream<T, Tick<L>, Bounded, O, R>
249where
250 L: Location<'a>,
251{
252 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
253 assert_eq!(
254 Location::id(&self.location),
255 expected_location,
256 "locations do not match"
257 );
258 self.location
259 .flow_state()
260 .borrow_mut()
261 .push_root(HydroRoot::CycleSink {
262 ident,
263 input: Box::new(self.ir_node.into_inner()),
264 op_metadata: HydroIrOpMetadata::new(),
265 });
266 }
267}
268
269impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
270 for Stream<T, L, B, O, R>
271where
272 L: Location<'a> + NoTick,
273{
274 type Location = L;
275
276 fn create_source(ident: syn::Ident, location: L) -> Self {
277 Stream::new(
278 location.clone(),
279 HydroNode::CycleSource {
280 ident,
281 metadata: location.new_node_metadata(Self::collection_kind()),
282 },
283 )
284 }
285}
286
287impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
288 for Stream<T, L, B, O, R>
289where
290 L: Location<'a> + NoTick,
291{
292 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
293 assert_eq!(
294 Location::id(&self.location),
295 expected_location,
296 "locations do not match"
297 );
298 self.location
299 .flow_state()
300 .borrow_mut()
301 .push_root(HydroRoot::CycleSink {
302 ident,
303 input: Box::new(self.ir_node.into_inner()),
304 op_metadata: HydroIrOpMetadata::new(),
305 });
306 }
307}
308
309impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
310where
311 T: Clone,
312 L: Location<'a>,
313{
314 fn clone(&self) -> Self {
315 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
316 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
317 *self.ir_node.borrow_mut() = HydroNode::Tee {
318 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
319 metadata: self.location.new_node_metadata(Self::collection_kind()),
320 };
321 }
322
323 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
324 Stream {
325 location: self.location.clone(),
326 ir_node: HydroNode::Tee {
327 inner: TeeNode(inner.0.clone()),
328 metadata: metadata.clone(),
329 }
330 .into(),
331 _phantom: PhantomData,
332 }
333 } else {
334 unreachable!()
335 }
336 }
337}
338
339impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
340where
341 L: Location<'a>,
342{
343 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
344 debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
345 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
346
347 Stream {
348 location,
349 ir_node: RefCell::new(ir_node),
350 _phantom: PhantomData,
351 }
352 }
353
354 /// Returns the [`Location`] where this stream is being materialized.
355 pub fn location(&self) -> &L {
356 &self.location
357 }
358
359 pub(crate) fn collection_kind() -> CollectionKind {
360 CollectionKind::Stream {
361 bound: B::BOUND_KIND,
362 order: O::ORDERING_KIND,
363 retry: R::RETRIES_KIND,
364 element_type: stageleft::quote_type::<T>().into(),
365 }
366 }
367
368 /// Produces a stream based on invoking `f` on each element.
369 /// If you do not want to modify the stream and instead only want to view
370 /// each item use [`Stream::inspect`] instead.
371 ///
372 /// # Example
373 /// ```rust
374 /// # use hydro_lang::prelude::*;
375 /// # use futures::StreamExt;
376 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
377 /// let words = process.source_iter(q!(vec!["hello", "world"]));
378 /// words.map(q!(|x| x.to_uppercase()))
379 /// # }, |mut stream| async move {
380 /// # for w in vec!["HELLO", "WORLD"] {
381 /// # assert_eq!(stream.next().await.unwrap(), w);
382 /// # }
383 /// # }));
384 /// ```
385 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
386 where
387 F: Fn(T) -> U + 'a,
388 {
389 let f = f.splice_fn1_ctx(&self.location).into();
390 Stream::new(
391 self.location.clone(),
392 HydroNode::Map {
393 f,
394 input: Box::new(self.ir_node.into_inner()),
395 metadata: self
396 .location
397 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
398 },
399 )
400 }
401
402 /// For each item `i` in the input stream, transform `i` using `f` and then treat the
403 /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
404 /// for the output type `U` must produce items in a **deterministic** order.
405 ///
406 /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
407 /// not deterministic, use [`Stream::flat_map_unordered`] instead.
408 ///
409 /// # Example
410 /// ```rust
411 /// # use hydro_lang::prelude::*;
412 /// # use futures::StreamExt;
413 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
414 /// process
415 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
416 /// .flat_map_ordered(q!(|x| x))
417 /// # }, |mut stream| async move {
418 /// // 1, 2, 3, 4
419 /// # for w in (1..5) {
420 /// # assert_eq!(stream.next().await.unwrap(), w);
421 /// # }
422 /// # }));
423 /// ```
424 pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
425 where
426 I: IntoIterator<Item = U>,
427 F: Fn(T) -> I + 'a,
428 {
429 let f = f.splice_fn1_ctx(&self.location).into();
430 Stream::new(
431 self.location.clone(),
432 HydroNode::FlatMap {
433 f,
434 input: Box::new(self.ir_node.into_inner()),
435 metadata: self
436 .location
437 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
438 },
439 )
440 }
441
442 /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
443 /// for the output type `U` to produce items in any order.
444 ///
445 /// # Example
446 /// ```rust
447 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
448 /// # use futures::StreamExt;
449 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
450 /// process
451 /// .source_iter(q!(vec![
452 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
453 /// std::collections::HashSet::from_iter(vec![3, 4]),
454 /// ]))
455 /// .flat_map_unordered(q!(|x| x))
456 /// # }, |mut stream| async move {
457 /// // 1, 2, 3, 4, but in no particular order
458 /// # let mut results = Vec::new();
459 /// # for w in (1..5) {
460 /// # results.push(stream.next().await.unwrap());
461 /// # }
462 /// # results.sort();
463 /// # assert_eq!(results, vec![1, 2, 3, 4]);
464 /// # }));
465 /// ```
466 pub fn flat_map_unordered<U, I, F>(
467 self,
468 f: impl IntoQuotedMut<'a, F, L>,
469 ) -> Stream<U, L, B, NoOrder, R>
470 where
471 I: IntoIterator<Item = U>,
472 F: Fn(T) -> I + 'a,
473 {
474 let f = f.splice_fn1_ctx(&self.location).into();
475 Stream::new(
476 self.location.clone(),
477 HydroNode::FlatMap {
478 f,
479 input: Box::new(self.ir_node.into_inner()),
480 metadata: self
481 .location
482 .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
483 },
484 )
485 }
486
487 /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
488 /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
489 ///
490 /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
491 /// not deterministic, use [`Stream::flatten_unordered`] instead.
492 ///
493 /// ```rust
494 /// # use hydro_lang::prelude::*;
495 /// # use futures::StreamExt;
496 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
497 /// process
498 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
499 /// .flatten_ordered()
500 /// # }, |mut stream| async move {
501 /// // 1, 2, 3, 4
502 /// # for w in (1..5) {
503 /// # assert_eq!(stream.next().await.unwrap(), w);
504 /// # }
505 /// # }));
506 /// ```
507 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
508 where
509 T: IntoIterator<Item = U>,
510 {
511 self.flat_map_ordered(q!(|d| d))
512 }
513
514 /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
515 /// for the element type `T` to produce items in any order.
516 ///
517 /// # Example
518 /// ```rust
519 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
520 /// # use futures::StreamExt;
521 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
522 /// process
523 /// .source_iter(q!(vec![
524 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
525 /// std::collections::HashSet::from_iter(vec![3, 4]),
526 /// ]))
527 /// .flatten_unordered()
528 /// # }, |mut stream| async move {
529 /// // 1, 2, 3, 4, but in no particular order
530 /// # let mut results = Vec::new();
531 /// # for w in (1..5) {
532 /// # results.push(stream.next().await.unwrap());
533 /// # }
534 /// # results.sort();
535 /// # assert_eq!(results, vec![1, 2, 3, 4]);
536 /// # }));
537 /// ```
538 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
539 where
540 T: IntoIterator<Item = U>,
541 {
542 self.flat_map_unordered(q!(|d| d))
543 }
544
545 /// Creates a stream containing only the elements of the input stream that satisfy a predicate
546 /// `f`, preserving the order of the elements.
547 ///
548 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
549 /// not modify or take ownership of the values. If you need to modify the values while filtering
550 /// use [`Stream::filter_map`] instead.
551 ///
552 /// # Example
553 /// ```rust
554 /// # use hydro_lang::prelude::*;
555 /// # use futures::StreamExt;
556 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
557 /// process
558 /// .source_iter(q!(vec![1, 2, 3, 4]))
559 /// .filter(q!(|&x| x > 2))
560 /// # }, |mut stream| async move {
561 /// // 3, 4
562 /// # for w in (3..5) {
563 /// # assert_eq!(stream.next().await.unwrap(), w);
564 /// # }
565 /// # }));
566 /// ```
567 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
568 where
569 F: Fn(&T) -> bool + 'a,
570 {
571 let f = f.splice_fn1_borrow_ctx(&self.location).into();
572 Stream::new(
573 self.location.clone(),
574 HydroNode::Filter {
575 f,
576 input: Box::new(self.ir_node.into_inner()),
577 metadata: self.location.new_node_metadata(Self::collection_kind()),
578 },
579 )
580 }
581
582 /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
583 ///
584 /// # Example
585 /// ```rust
586 /// # use hydro_lang::prelude::*;
587 /// # use futures::StreamExt;
588 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
589 /// process
590 /// .source_iter(q!(vec!["1", "hello", "world", "2"]))
591 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
592 /// # }, |mut stream| async move {
593 /// // 1, 2
594 /// # for w in (1..3) {
595 /// # assert_eq!(stream.next().await.unwrap(), w);
596 /// # }
597 /// # }));
598 /// ```
599 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
600 where
601 F: Fn(T) -> Option<U> + 'a,
602 {
603 let f = f.splice_fn1_ctx(&self.location).into();
604 Stream::new(
605 self.location.clone(),
606 HydroNode::FilterMap {
607 f,
608 input: Box::new(self.ir_node.into_inner()),
609 metadata: self
610 .location
611 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
612 },
613 )
614 }
615
616 /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
617 /// where `x` is the final value of `other`, a bounded [`Singleton`].
618 ///
619 /// # Example
620 /// ```rust
621 /// # use hydro_lang::prelude::*;
622 /// # use futures::StreamExt;
623 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
624 /// let tick = process.tick();
625 /// let batch = process
626 /// .source_iter(q!(vec![1, 2, 3, 4]))
627 /// .batch(&tick, nondet!(/** test */));
628 /// let count = batch.clone().count(); // `count()` returns a singleton
629 /// batch.cross_singleton(count).all_ticks()
630 /// # }, |mut stream| async move {
631 /// // (1, 4), (2, 4), (3, 4), (4, 4)
632 /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
633 /// # assert_eq!(stream.next().await.unwrap(), w);
634 /// # }
635 /// # }));
636 /// ```
637 pub fn cross_singleton<O2>(
638 self,
639 other: impl Into<Optional<O2, L, Bounded>>,
640 ) -> Stream<(T, O2), L, B, O, R>
641 where
642 O2: Clone,
643 {
644 let other: Optional<O2, L, Bounded> = other.into();
645 check_matching_location(&self.location, &other.location);
646
647 Stream::new(
648 self.location.clone(),
649 HydroNode::CrossSingleton {
650 left: Box::new(self.ir_node.into_inner()),
651 right: Box::new(other.ir_node.into_inner()),
652 metadata: self
653 .location
654 .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
655 },
656 )
657 }
658
659 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
660 ///
661 /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
662 /// leader of a cluster.
663 ///
664 /// # Example
665 /// ```rust
666 /// # use hydro_lang::prelude::*;
667 /// # use futures::StreamExt;
668 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
669 /// let tick = process.tick();
670 /// // ticks are lazy by default, forces the second tick to run
671 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
672 ///
673 /// let batch_first_tick = process
674 /// .source_iter(q!(vec![1, 2, 3, 4]))
675 /// .batch(&tick, nondet!(/** test */));
676 /// let batch_second_tick = process
677 /// .source_iter(q!(vec![5, 6, 7, 8]))
678 /// .batch(&tick, nondet!(/** test */))
679 /// .defer_tick(); // appears on the second tick
680 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
681 /// batch_first_tick.chain(batch_second_tick)
682 /// .filter_if_some(some_on_first_tick)
683 /// .all_ticks()
684 /// # }, |mut stream| async move {
685 /// // [1, 2, 3, 4]
686 /// # for w in vec![1, 2, 3, 4] {
687 /// # assert_eq!(stream.next().await.unwrap(), w);
688 /// # }
689 /// # }));
690 /// ```
691 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
692 self.cross_singleton(signal.map(q!(|_u| ())))
693 .map(q!(|(d, _signal)| d))
694 }
695
696 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
697 ///
698 /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
699 /// some local state.
700 ///
701 /// # Example
702 /// ```rust
703 /// # use hydro_lang::prelude::*;
704 /// # use futures::StreamExt;
705 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
706 /// let tick = process.tick();
707 /// // ticks are lazy by default, forces the second tick to run
708 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
709 ///
710 /// let batch_first_tick = process
711 /// .source_iter(q!(vec![1, 2, 3, 4]))
712 /// .batch(&tick, nondet!(/** test */));
713 /// let batch_second_tick = process
714 /// .source_iter(q!(vec![5, 6, 7, 8]))
715 /// .batch(&tick, nondet!(/** test */))
716 /// .defer_tick(); // appears on the second tick
717 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
718 /// batch_first_tick.chain(batch_second_tick)
719 /// .filter_if_none(some_on_first_tick)
720 /// .all_ticks()
721 /// # }, |mut stream| async move {
722 /// // [5, 6, 7, 8]
723 /// # for w in vec![5, 6, 7, 8] {
724 /// # assert_eq!(stream.next().await.unwrap(), w);
725 /// # }
726 /// # }));
727 /// ```
728 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
729 self.filter_if_some(
730 other
731 .map(q!(|_| ()))
732 .into_singleton()
733 .filter(q!(|o| o.is_none())),
734 )
735 }
736
737 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
738 /// tupled pairs in a non-deterministic order.
739 ///
740 /// # Example
741 /// ```rust
742 /// # use hydro_lang::prelude::*;
743 /// # use std::collections::HashSet;
744 /// # use futures::StreamExt;
745 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
746 /// let tick = process.tick();
747 /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
748 /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
749 /// stream1.cross_product(stream2)
750 /// # }, |mut stream| async move {
751 /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
752 /// # stream.map(|i| assert!(expected.contains(&i)));
753 /// # }));
754 /// ```
755 pub fn cross_product<T2, O2: Ordering>(
756 self,
757 other: Stream<T2, L, B, O2, R>,
758 ) -> Stream<(T, T2), L, B, NoOrder, R>
759 where
760 T: Clone,
761 T2: Clone,
762 {
763 check_matching_location(&self.location, &other.location);
764
765 Stream::new(
766 self.location.clone(),
767 HydroNode::CrossProduct {
768 left: Box::new(self.ir_node.into_inner()),
769 right: Box::new(other.ir_node.into_inner()),
770 metadata: self
771 .location
772 .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
773 },
774 )
775 }
776
777 /// Takes one stream as input and filters out any duplicate occurrences. The output
778 /// contains all unique values from the input.
779 ///
780 /// # Example
781 /// ```rust
782 /// # use hydro_lang::prelude::*;
783 /// # use futures::StreamExt;
784 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
785 /// let tick = process.tick();
786 /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
787 /// # }, |mut stream| async move {
788 /// # for w in vec![1, 2, 3, 4] {
789 /// # assert_eq!(stream.next().await.unwrap(), w);
790 /// # }
791 /// # }));
792 /// ```
793 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
794 where
795 T: Eq + Hash,
796 {
797 Stream::new(
798 self.location.clone(),
799 HydroNode::Unique {
800 input: Box::new(self.ir_node.into_inner()),
801 metadata: self
802 .location
803 .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
804 },
805 )
806 }
807
808 /// Outputs everything in this stream that is *not* contained in the `other` stream.
809 ///
810 /// The `other` stream must be [`Bounded`], since this function will wait until
811 /// all its elements are available before producing any output.
812 /// # Example
813 /// ```rust
814 /// # use hydro_lang::prelude::*;
815 /// # use futures::StreamExt;
816 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
817 /// let tick = process.tick();
818 /// let stream = process
819 /// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
820 /// .batch(&tick, nondet!(/** test */));
821 /// let batch = process
822 /// .source_iter(q!(vec![1, 2]))
823 /// .batch(&tick, nondet!(/** test */));
824 /// stream.filter_not_in(batch).all_ticks()
825 /// # }, |mut stream| async move {
826 /// # for w in vec![3, 4] {
827 /// # assert_eq!(stream.next().await.unwrap(), w);
828 /// # }
829 /// # }));
830 /// ```
831 pub fn filter_not_in<O2: Ordering>(self, other: Stream<T, L, Bounded, O2, R>) -> Self
832 where
833 T: Eq + Hash,
834 {
835 check_matching_location(&self.location, &other.location);
836
837 Stream::new(
838 self.location.clone(),
839 HydroNode::Difference {
840 pos: Box::new(self.ir_node.into_inner()),
841 neg: Box::new(other.ir_node.into_inner()),
842 metadata: self
843 .location
844 .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
845 },
846 )
847 }
848
849 /// An operator which allows you to "inspect" each element of a stream without
850 /// modifying it. The closure `f` is called on a reference to each item. This is
851 /// mainly useful for debugging, and should not be used to generate side-effects.
852 ///
853 /// # Example
854 /// ```rust
855 /// # use hydro_lang::prelude::*;
856 /// # use futures::StreamExt;
857 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
858 /// let nums = process.source_iter(q!(vec![1, 2]));
859 /// // prints "1 * 10 = 10" and "2 * 10 = 20"
860 /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
861 /// # }, |mut stream| async move {
862 /// # for w in vec![1, 2] {
863 /// # assert_eq!(stream.next().await.unwrap(), w);
864 /// # }
865 /// # }));
866 /// ```
867 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
868 where
869 F: Fn(&T) + 'a,
870 {
871 let f = f.splice_fn1_borrow_ctx(&self.location).into();
872
873 Stream::new(
874 self.location.clone(),
875 HydroNode::Inspect {
876 f,
877 input: Box::new(self.ir_node.into_inner()),
878 metadata: self.location.new_node_metadata(Self::collection_kind()),
879 },
880 )
881 }
882
883 /// An operator which allows you to "name" a `HydroNode`.
884 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
885 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
886 {
887 let mut node = self.ir_node.borrow_mut();
888 let metadata = node.metadata_mut();
889 metadata.tag = Some(name.to_string());
890 }
891 self
892 }
893
894 /// Explicitly "casts" the stream to a type with a different ordering
895 /// guarantee. Useful in unsafe code where the ordering cannot be proven
896 /// by the type-system.
897 ///
898 /// # Non-Determinism
899 /// This function is used as an escape hatch, and any mistakes in the
900 /// provided ordering guarantee will propagate into the guarantees
901 /// for the rest of the program.
902 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
903 if O::ORDERING_KIND == O2::ORDERING_KIND {
904 Stream::new(self.location, self.ir_node.into_inner())
905 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
906 // We can always weaken the ordering guarantee
907 Stream::new(
908 self.location.clone(),
909 HydroNode::Cast {
910 inner: Box::new(self.ir_node.into_inner()),
911 metadata: self
912 .location
913 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
914 },
915 )
916 } else {
917 Stream::new(
918 self.location.clone(),
919 HydroNode::ObserveNonDet {
920 inner: Box::new(self.ir_node.into_inner()),
921 metadata: self
922 .location
923 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
924 },
925 )
926 }
927 }
928
929 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
930 /// which is always safe because that is the weakest possible guarantee.
931 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
932 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
933 self.assume_ordering::<NoOrder>(nondet)
934 }
935
936 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
937 /// enforcing that `O2` is weaker than the input ordering guarantee.
938 pub fn weaken_ordering<O2: Ordering + MinOrder<O, Min = O2>>(self) -> Stream<T, L, B, O2, R> {
939 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
940 self.assume_ordering::<O2>(nondet)
941 }
942
943 /// Explicitly "casts" the stream to a type with a different retries
944 /// guarantee. Useful in unsafe code where the lack of retries cannot
945 /// be proven by the type-system.
946 ///
947 /// # Non-Determinism
948 /// This function is used as an escape hatch, and any mistakes in the
949 /// provided retries guarantee will propagate into the guarantees
950 /// for the rest of the program.
951 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
952 if R::RETRIES_KIND == R2::RETRIES_KIND {
953 Stream::new(self.location, self.ir_node.into_inner())
954 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
955 // We can always weaken the retries guarantee
956 Stream::new(
957 self.location.clone(),
958 HydroNode::Cast {
959 inner: Box::new(self.ir_node.into_inner()),
960 metadata: self
961 .location
962 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
963 },
964 )
965 } else {
966 Stream::new(
967 self.location.clone(),
968 HydroNode::ObserveNonDet {
969 inner: Box::new(self.ir_node.into_inner()),
970 metadata: self
971 .location
972 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
973 },
974 )
975 }
976 }
977
978 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
979 /// which is always safe because that is the weakest possible guarantee.
980 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
981 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
982 self.assume_retries::<AtLeastOnce>(nondet)
983 }
984
985 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
986 /// enforcing that `R2` is weaker than the input retries guarantee.
987 pub fn weaken_retries<R2: Retries + MinRetries<R, Min = R2>>(self) -> Stream<T, L, B, O, R2> {
988 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
989 self.assume_retries::<R2>(nondet)
990 }
991}
992
993impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
994where
995 L: Location<'a>,
996{
997 /// Given a stream with [`ExactlyOnce`] retry guarantees, weakens it to an arbitrary guarantee
998 /// `R2`, which is safe because all guarantees are equal to or weaker than [`ExactlyOnce`]
999 pub fn weaker_retries<R2: Retries>(self) -> Stream<T, L, B, O, R2> {
1000 self.assume_retries(
1001 nondet!(/** any retry ordering is the same or weaker than ExactlyOnce */),
1002 )
1003 }
1004}
1005
1006impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
1007where
1008 L: Location<'a>,
1009{
1010 /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
1011 ///
1012 /// # Example
1013 /// ```rust
1014 /// # use hydro_lang::prelude::*;
1015 /// # use futures::StreamExt;
1016 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1017 /// process.source_iter(q!(&[1, 2, 3])).cloned()
1018 /// # }, |mut stream| async move {
1019 /// // 1, 2, 3
1020 /// # for w in vec![1, 2, 3] {
1021 /// # assert_eq!(stream.next().await.unwrap(), w);
1022 /// # }
1023 /// # }));
1024 /// ```
1025 pub fn cloned(self) -> Stream<T, L, B, O, R>
1026 where
1027 T: Clone,
1028 {
1029 self.map(q!(|d| d.clone()))
1030 }
1031}
1032
1033impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
1034where
1035 L: Location<'a>,
1036{
1037 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1038 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1039 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1040 ///
1041 /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
1042 /// and there may be duplicates.
1043 ///
1044 /// # Example
1045 /// ```rust
1046 /// # use hydro_lang::prelude::*;
1047 /// # use futures::StreamExt;
1048 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1049 /// let tick = process.tick();
1050 /// let bools = process.source_iter(q!(vec![false, true, false]));
1051 /// let batch = bools.batch(&tick, nondet!(/** test */));
1052 /// batch
1053 /// .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1054 /// .all_ticks()
1055 /// # }, |mut stream| async move {
1056 /// // true
1057 /// # assert_eq!(stream.next().await.unwrap(), true);
1058 /// # }));
1059 /// ```
1060 pub fn fold_commutative_idempotent<A, I, F>(
1061 self,
1062 init: impl IntoQuotedMut<'a, I, L>,
1063 comb: impl IntoQuotedMut<'a, F, L>,
1064 ) -> Singleton<A, L, B>
1065 where
1066 I: Fn() -> A + 'a,
1067 F: Fn(&mut A, T),
1068 {
1069 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1070 self.assume_ordering(nondet)
1071 .assume_retries(nondet)
1072 .fold(init, comb)
1073 }
1074
1075 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1076 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1077 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1078 /// reference, so that it can be modified in place.
1079 ///
1080 /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
1081 /// and there may be duplicates.
1082 ///
1083 /// # Example
1084 /// ```rust
1085 /// # use hydro_lang::prelude::*;
1086 /// # use futures::StreamExt;
1087 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1088 /// let tick = process.tick();
1089 /// let bools = process.source_iter(q!(vec![false, true, false]));
1090 /// let batch = bools.batch(&tick, nondet!(/** test */));
1091 /// batch
1092 /// .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1093 /// .all_ticks()
1094 /// # }, |mut stream| async move {
1095 /// // true
1096 /// # assert_eq!(stream.next().await.unwrap(), true);
1097 /// # }));
1098 /// ```
1099 pub fn reduce_commutative_idempotent<F>(
1100 self,
1101 comb: impl IntoQuotedMut<'a, F, L>,
1102 ) -> Optional<T, L, B>
1103 where
1104 F: Fn(&mut T, T) + 'a,
1105 {
1106 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1107 self.assume_ordering(nondet)
1108 .assume_retries(nondet)
1109 .reduce(comb)
1110 }
1111
1112 /// Computes the maximum element in the stream as an [`Optional`], which
1113 /// will be empty until the first element in the input arrives.
1114 ///
1115 /// # Example
1116 /// ```rust
1117 /// # use hydro_lang::prelude::*;
1118 /// # use futures::StreamExt;
1119 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1120 /// let tick = process.tick();
1121 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1122 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1123 /// batch.max().all_ticks()
1124 /// # }, |mut stream| async move {
1125 /// // 4
1126 /// # assert_eq!(stream.next().await.unwrap(), 4);
1127 /// # }));
1128 /// ```
1129 pub fn max(self) -> Optional<T, L, B>
1130 where
1131 T: Ord,
1132 {
1133 self.reduce_commutative_idempotent(q!(|curr, new| {
1134 if new > *curr {
1135 *curr = new;
1136 }
1137 }))
1138 }
1139
1140 /// Computes the minimum element in the stream as an [`Optional`], which
1141 /// will be empty until the first element in the input arrives.
1142 ///
1143 /// # Example
1144 /// ```rust
1145 /// # use hydro_lang::prelude::*;
1146 /// # use futures::StreamExt;
1147 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1148 /// let tick = process.tick();
1149 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1150 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1151 /// batch.min().all_ticks()
1152 /// # }, |mut stream| async move {
1153 /// // 1
1154 /// # assert_eq!(stream.next().await.unwrap(), 1);
1155 /// # }));
1156 /// ```
1157 pub fn min(self) -> Optional<T, L, B>
1158 where
1159 T: Ord,
1160 {
1161 self.reduce_commutative_idempotent(q!(|curr, new| {
1162 if new < *curr {
1163 *curr = new;
1164 }
1165 }))
1166 }
1167}
1168
1169impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1170where
1171 L: Location<'a>,
1172{
1173 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1174 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1175 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1176 ///
1177 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1178 ///
1179 /// # Example
1180 /// ```rust
1181 /// # use hydro_lang::prelude::*;
1182 /// # use futures::StreamExt;
1183 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1184 /// let tick = process.tick();
1185 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1186 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1187 /// batch
1188 /// .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1189 /// .all_ticks()
1190 /// # }, |mut stream| async move {
1191 /// // 10
1192 /// # assert_eq!(stream.next().await.unwrap(), 10);
1193 /// # }));
1194 /// ```
1195 pub fn fold_commutative<A, I, F>(
1196 self,
1197 init: impl IntoQuotedMut<'a, I, L>,
1198 comb: impl IntoQuotedMut<'a, F, L>,
1199 ) -> Singleton<A, L, B>
1200 where
1201 I: Fn() -> A + 'a,
1202 F: Fn(&mut A, T),
1203 {
1204 let nondet = nondet!(/** the combinator function is commutative */);
1205 self.assume_ordering(nondet).fold(init, comb)
1206 }
1207
1208 /// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream,
1209 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1210 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1211 /// reference, so that it can be modified in place.
1212 ///
1213 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1214 ///
1215 /// # Example
1216 /// ```rust
1217 /// # use hydro_lang::prelude::*;
1218 /// # use futures::StreamExt;
1219 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1220 /// let tick = process.tick();
1221 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1222 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1223 /// batch
1224 /// .reduce_commutative(q!(|curr, new| *curr += new))
1225 /// .all_ticks()
1226 /// # }, |mut stream| async move {
1227 /// // 10
1228 /// # assert_eq!(stream.next().await.unwrap(), 10);
1229 /// # }));
1230 /// ```
1231 pub fn reduce_commutative<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1232 where
1233 F: Fn(&mut T, T) + 'a,
1234 {
1235 let nondet = nondet!(/** the combinator function is commutative */);
1236 self.assume_ordering(nondet).reduce(comb)
1237 }
1238
1239 /// Computes the number of elements in the stream as a [`Singleton`].
1240 ///
1241 /// # Example
1242 /// ```rust
1243 /// # use hydro_lang::prelude::*;
1244 /// # use futures::StreamExt;
1245 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1246 /// let tick = process.tick();
1247 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1248 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1249 /// batch.count().all_ticks()
1250 /// # }, |mut stream| async move {
1251 /// // 4
1252 /// # assert_eq!(stream.next().await.unwrap(), 4);
1253 /// # }));
1254 /// ```
1255 pub fn count(self) -> Singleton<usize, L, B> {
1256 self.fold_commutative(q!(|| 0usize), q!(|count, _| *count += 1))
1257 }
1258}
1259
1260impl<'a, T, L, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R>
1261where
1262 L: Location<'a>,
1263{
1264 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1265 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1266 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1267 ///
1268 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1269 ///
1270 /// # Example
1271 /// ```rust
1272 /// # use hydro_lang::prelude::*;
1273 /// # use futures::StreamExt;
1274 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1275 /// let tick = process.tick();
1276 /// let bools = process.source_iter(q!(vec![false, true, false]));
1277 /// let batch = bools.batch(&tick, nondet!(/** test */));
1278 /// batch
1279 /// .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1280 /// .all_ticks()
1281 /// # }, |mut stream| async move {
1282 /// // true
1283 /// # assert_eq!(stream.next().await.unwrap(), true);
1284 /// # }));
1285 /// ```
1286 pub fn fold_idempotent<A, I, F>(
1287 self,
1288 init: impl IntoQuotedMut<'a, I, L>,
1289 comb: impl IntoQuotedMut<'a, F, L>,
1290 ) -> Singleton<A, L, B>
1291 where
1292 I: Fn() -> A + 'a,
1293 F: Fn(&mut A, T),
1294 {
1295 let nondet = nondet!(/** the combinator function is idempotent */);
1296 self.assume_retries(nondet).fold(init, comb)
1297 }
1298
1299 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1300 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1301 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1302 /// reference, so that it can be modified in place.
1303 ///
1304 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1305 ///
1306 /// # Example
1307 /// ```rust
1308 /// # use hydro_lang::prelude::*;
1309 /// # use futures::StreamExt;
1310 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1311 /// let tick = process.tick();
1312 /// let bools = process.source_iter(q!(vec![false, true, false]));
1313 /// let batch = bools.batch(&tick, nondet!(/** test */));
1314 /// batch.reduce_idempotent(q!(|acc, x| *acc |= x)).all_ticks()
1315 /// # }, |mut stream| async move {
1316 /// // true
1317 /// # assert_eq!(stream.next().await.unwrap(), true);
1318 /// # }));
1319 /// ```
1320 pub fn reduce_idempotent<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1321 where
1322 F: Fn(&mut T, T) + 'a,
1323 {
1324 let nondet = nondet!(/** the combinator function is idempotent */);
1325 self.assume_retries(nondet).reduce(comb)
1326 }
1327
1328 /// Computes the first element in the stream as an [`Optional`], which
1329 /// will be empty until the first element in the input arrives.
1330 ///
1331 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1332 /// re-ordering of elements may cause the first element to change.
1333 ///
1334 /// # Example
1335 /// ```rust
1336 /// # use hydro_lang::prelude::*;
1337 /// # use futures::StreamExt;
1338 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1339 /// let tick = process.tick();
1340 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1341 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1342 /// batch.first().all_ticks()
1343 /// # }, |mut stream| async move {
1344 /// // 1
1345 /// # assert_eq!(stream.next().await.unwrap(), 1);
1346 /// # }));
1347 /// ```
1348 pub fn first(self) -> Optional<T, L, B> {
1349 self.reduce_idempotent(q!(|_, _| {}))
1350 }
1351
1352 /// Computes the last element in the stream as an [`Optional`], which
1353 /// will be empty until an element in the input arrives.
1354 ///
1355 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1356 /// re-ordering of elements may cause the last element to change.
1357 ///
1358 /// # Example
1359 /// ```rust
1360 /// # use hydro_lang::prelude::*;
1361 /// # use futures::StreamExt;
1362 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1363 /// let tick = process.tick();
1364 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1365 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1366 /// batch.last().all_ticks()
1367 /// # }, |mut stream| async move {
1368 /// // 4
1369 /// # assert_eq!(stream.next().await.unwrap(), 4);
1370 /// # }));
1371 /// ```
1372 pub fn last(self) -> Optional<T, L, B> {
1373 self.reduce_idempotent(q!(|curr, new| *curr = new))
1374 }
1375}
1376
1377impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
1378where
1379 L: Location<'a>,
1380{
1381 /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1382 ///
1383 /// # Example
1384 /// ```rust
1385 /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1386 /// # use futures::StreamExt;
1387 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, TotalOrder, ExactlyOnce>(|process| {
1388 /// let tick = process.tick();
1389 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1390 /// numbers.enumerate()
1391 /// # }, |mut stream| async move {
1392 /// // (0, 1), (1, 2), (2, 3), (3, 4)
1393 /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1394 /// # assert_eq!(stream.next().await.unwrap(), w);
1395 /// # }
1396 /// # }));
1397 /// ```
1398 pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder, ExactlyOnce> {
1399 Stream::new(
1400 self.location.clone(),
1401 HydroNode::Enumerate {
1402 input: Box::new(self.ir_node.into_inner()),
1403 metadata: self.location.new_node_metadata(Stream::<
1404 (usize, T),
1405 L,
1406 B,
1407 TotalOrder,
1408 ExactlyOnce,
1409 >::collection_kind()),
1410 },
1411 )
1412 }
1413
1414 /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1415 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1416 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1417 ///
1418 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1419 /// to depend on the order of elements in the stream.
1420 ///
1421 /// # Example
1422 /// ```rust
1423 /// # use hydro_lang::prelude::*;
1424 /// # use futures::StreamExt;
1425 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1426 /// let tick = process.tick();
1427 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1428 /// let batch = words.batch(&tick, nondet!(/** test */));
1429 /// batch
1430 /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1431 /// .all_ticks()
1432 /// # }, |mut stream| async move {
1433 /// // "HELLOWORLD"
1434 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1435 /// # }));
1436 /// ```
1437 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
1438 self,
1439 init: impl IntoQuotedMut<'a, I, L>,
1440 comb: impl IntoQuotedMut<'a, F, L>,
1441 ) -> Singleton<A, L, B> {
1442 let init = init.splice_fn0_ctx(&self.location).into();
1443 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1444
1445 let core = HydroNode::Fold {
1446 init,
1447 acc: comb,
1448 input: Box::new(self.ir_node.into_inner()),
1449 metadata: self
1450 .location
1451 .new_node_metadata(Singleton::<A, L, B>::collection_kind()),
1452 };
1453
1454 Singleton::new(self.location, core)
1455 }
1456
1457 /// Collects all the elements of this stream into a single [`Vec`] element.
1458 ///
1459 /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1460 /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1461 /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1462 /// the vector at an arbitrary point in time.
1463 ///
1464 /// # Example
1465 /// ```rust
1466 /// # use hydro_lang::prelude::*;
1467 /// # use futures::StreamExt;
1468 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1469 /// let tick = process.tick();
1470 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1471 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1472 /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1473 /// # }, |mut stream| async move {
1474 /// // [ vec![1, 2, 3, 4] ]
1475 /// # for w in vec![vec![1, 2, 3, 4]] {
1476 /// # assert_eq!(stream.next().await.unwrap(), w);
1477 /// # }
1478 /// # }));
1479 /// ```
1480 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B> {
1481 self.fold(
1482 q!(|| vec![]),
1483 q!(|acc, v| {
1484 acc.push(v);
1485 }),
1486 )
1487 }
1488
1489 /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1490 /// and emitting each intermediate result.
1491 ///
1492 /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1493 /// containing all intermediate accumulated values. The scan operation can also terminate early
1494 /// by returning `None`.
1495 ///
1496 /// The function takes a mutable reference to the accumulator and the current element, and returns
1497 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1498 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1499 ///
1500 /// # Examples
1501 ///
1502 /// Basic usage - running sum:
1503 /// ```rust
1504 /// # use hydro_lang::prelude::*;
1505 /// # use futures::StreamExt;
1506 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1507 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1508 /// q!(|| 0),
1509 /// q!(|acc, x| {
1510 /// *acc += x;
1511 /// Some(*acc)
1512 /// }),
1513 /// )
1514 /// # }, |mut stream| async move {
1515 /// // Output: 1, 3, 6, 10
1516 /// # for w in vec![1, 3, 6, 10] {
1517 /// # assert_eq!(stream.next().await.unwrap(), w);
1518 /// # }
1519 /// # }));
1520 /// ```
1521 ///
1522 /// Early termination example:
1523 /// ```rust
1524 /// # use hydro_lang::prelude::*;
1525 /// # use futures::StreamExt;
1526 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1527 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1528 /// q!(|| 1),
1529 /// q!(|state, x| {
1530 /// *state = *state * x;
1531 /// if *state > 6 {
1532 /// None // Terminate the stream
1533 /// } else {
1534 /// Some(-*state)
1535 /// }
1536 /// }),
1537 /// )
1538 /// # }, |mut stream| async move {
1539 /// // Output: -1, -2, -6
1540 /// # for w in vec![-1, -2, -6] {
1541 /// # assert_eq!(stream.next().await.unwrap(), w);
1542 /// # }
1543 /// # }));
1544 /// ```
1545 pub fn scan<A, U, I, F>(
1546 self,
1547 init: impl IntoQuotedMut<'a, I, L>,
1548 f: impl IntoQuotedMut<'a, F, L>,
1549 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1550 where
1551 I: Fn() -> A + 'a,
1552 F: Fn(&mut A, T) -> Option<U> + 'a,
1553 {
1554 let init = init.splice_fn0_ctx(&self.location).into();
1555 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1556
1557 Stream::new(
1558 self.location.clone(),
1559 HydroNode::Scan {
1560 init,
1561 acc: f,
1562 input: Box::new(self.ir_node.into_inner()),
1563 metadata: self.location.new_node_metadata(
1564 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1565 ),
1566 },
1567 )
1568 }
1569
1570 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1571 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1572 /// until the first element in the input arrives.
1573 ///
1574 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1575 /// to depend on the order of elements in the stream.
1576 ///
1577 /// # Example
1578 /// ```rust
1579 /// # use hydro_lang::prelude::*;
1580 /// # use futures::StreamExt;
1581 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1582 /// let tick = process.tick();
1583 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1584 /// let batch = words.batch(&tick, nondet!(/** test */));
1585 /// batch
1586 /// .map(q!(|x| x.to_string()))
1587 /// .reduce(q!(|curr, new| curr.push_str(&new)))
1588 /// .all_ticks()
1589 /// # }, |mut stream| async move {
1590 /// // "HELLOWORLD"
1591 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1592 /// # }));
1593 /// ```
1594 pub fn reduce<F: Fn(&mut T, T) + 'a>(
1595 self,
1596 comb: impl IntoQuotedMut<'a, F, L>,
1597 ) -> Optional<T, L, B> {
1598 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1599 let core = HydroNode::Reduce {
1600 f,
1601 input: Box::new(self.ir_node.into_inner()),
1602 metadata: self
1603 .location
1604 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1605 };
1606
1607 Optional::new(self.location, core)
1608 }
1609}
1610
1611impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
1612 /// Produces a new stream that interleaves the elements of the two input streams.
1613 /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1614 ///
1615 /// Currently, both input streams must be [`Unbounded`]. When the streams are
1616 /// [`Bounded`], you can use [`Stream::chain`] instead.
1617 ///
1618 /// # Example
1619 /// ```rust
1620 /// # use hydro_lang::prelude::*;
1621 /// # use futures::StreamExt;
1622 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1623 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1624 /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1625 /// # }, |mut stream| async move {
1626 /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1627 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1628 /// # assert_eq!(stream.next().await.unwrap(), w);
1629 /// # }
1630 /// # }));
1631 /// ```
1632 pub fn interleave<O2: Ordering, R2: Retries>(
1633 self,
1634 other: Stream<T, L, Unbounded, O2, R2>,
1635 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1636 where
1637 R: MinRetries<R2>,
1638 {
1639 let tick = self.location.tick();
1640 // Because the outputs are unordered, we can interleave batches from both streams.
1641 let nondet_batch_interleaving = nondet!(/** output stream is NoOrder, can interleave */);
1642 self.batch(&tick, nondet_batch_interleaving)
1643 .weakest_ordering()
1644 .chain(
1645 other
1646 .batch(&tick, nondet_batch_interleaving)
1647 .weakest_ordering(),
1648 )
1649 .all_ticks()
1650 }
1651}
1652
1653impl<'a, T, L, O: Ordering, R: Retries> Stream<T, L, Bounded, O, R>
1654where
1655 L: Location<'a>,
1656{
1657 /// Produces a new stream that emits the input elements in sorted order.
1658 ///
1659 /// The input stream can have any ordering guarantee, but the output stream
1660 /// will have a [`TotalOrder`] guarantee. This operator will block until all
1661 /// elements in the input stream are available, so it requires the input stream
1662 /// to be [`Bounded`].
1663 ///
1664 /// # Example
1665 /// ```rust
1666 /// # use hydro_lang::prelude::*;
1667 /// # use futures::StreamExt;
1668 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1669 /// let tick = process.tick();
1670 /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1671 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1672 /// batch.sort().all_ticks()
1673 /// # }, |mut stream| async move {
1674 /// // 1, 2, 3, 4
1675 /// # for w in (1..5) {
1676 /// # assert_eq!(stream.next().await.unwrap(), w);
1677 /// # }
1678 /// # }));
1679 /// ```
1680 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1681 where
1682 T: Ord,
1683 {
1684 Stream::new(
1685 self.location.clone(),
1686 HydroNode::Sort {
1687 input: Box::new(self.ir_node.into_inner()),
1688 metadata: self
1689 .location
1690 .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
1691 },
1692 )
1693 }
1694
1695 /// Produces a new stream that first emits the elements of the `self` stream,
1696 /// and then emits the elements of the `other` stream. The output stream has
1697 /// a [`TotalOrder`] guarantee if and only if both input streams have a
1698 /// [`TotalOrder`] guarantee.
1699 ///
1700 /// Currently, both input streams must be [`Bounded`]. This operator will block
1701 /// on the first stream until all its elements are available. In a future version,
1702 /// we will relax the requirement on the `other` stream.
1703 ///
1704 /// # Example
1705 /// ```rust
1706 /// # use hydro_lang::prelude::*;
1707 /// # use futures::StreamExt;
1708 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1709 /// let tick = process.tick();
1710 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1711 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1712 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1713 /// # }, |mut stream| async move {
1714 /// // 2, 3, 4, 5, 1, 2, 3, 4
1715 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1716 /// # assert_eq!(stream.next().await.unwrap(), w);
1717 /// # }
1718 /// # }));
1719 /// ```
1720 pub fn chain<O2: Ordering, R2: Retries>(
1721 self,
1722 other: Stream<T, L, Bounded, O2, R2>,
1723 ) -> Stream<T, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1724 where
1725 O: MinOrder<O2>,
1726 R: MinRetries<R2>,
1727 {
1728 check_matching_location(&self.location, &other.location);
1729
1730 Stream::new(
1731 self.location.clone(),
1732 HydroNode::Chain {
1733 first: Box::new(self.ir_node.into_inner()),
1734 second: Box::new(other.ir_node.into_inner()),
1735 metadata: self.location.new_node_metadata(Stream::<
1736 T,
1737 L,
1738 Bounded,
1739 <O as MinOrder<O2>>::Min,
1740 <R as MinRetries<R2>>::Min,
1741 >::collection_kind()),
1742 },
1743 )
1744 }
1745
1746 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
1747 /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
1748 /// because this is compiled into a nested loop.
1749 pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
1750 self,
1751 other: Stream<T2, L, Bounded, O2, R>,
1752 ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
1753 where
1754 T: Clone,
1755 T2: Clone,
1756 {
1757 check_matching_location(&self.location, &other.location);
1758
1759 Stream::new(
1760 self.location.clone(),
1761 HydroNode::CrossProduct {
1762 left: Box::new(self.ir_node.into_inner()),
1763 right: Box::new(other.ir_node.into_inner()),
1764 metadata: self.location.new_node_metadata(Stream::<
1765 (T, T2),
1766 L,
1767 Bounded,
1768 <O2 as MinOrder<O>>::Min,
1769 R,
1770 >::collection_kind()),
1771 },
1772 )
1773 }
1774
1775 /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
1776 /// `self` used as the values for *each* key.
1777 ///
1778 /// This is helpful when "broadcasting" a set of values so that all the keys have the same
1779 /// values. For example, it can be used to send the same set of elements to several cluster
1780 /// members, if the membership information is available as a [`KeyedSingleton`].
1781 ///
1782 /// # Example
1783 /// ```rust
1784 /// # use hydro_lang::prelude::*;
1785 /// # use futures::StreamExt;
1786 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1787 /// # let tick = process.tick();
1788 /// let keyed_singleton = // { 1: (), 2: () }
1789 /// # process
1790 /// # .source_iter(q!(vec![(1, ()), (2, ())]))
1791 /// # .into_keyed()
1792 /// # .batch(&tick, nondet!(/** test */))
1793 /// # .first();
1794 /// let stream = // [ "a", "b" ]
1795 /// # process
1796 /// # .source_iter(q!(vec!["a".to_string(), "b".to_string()]))
1797 /// # .batch(&tick, nondet!(/** test */));
1798 /// stream.repeat_with_keys(keyed_singleton)
1799 /// # .entries().all_ticks()
1800 /// # }, |mut stream| async move {
1801 /// // { 1: ["a", "b" ], 2: ["a", "b"] }
1802 /// # let mut results = Vec::new();
1803 /// # for _ in 0..4 {
1804 /// # results.push(stream.next().await.unwrap());
1805 /// # }
1806 /// # results.sort();
1807 /// # assert_eq!(results, vec![(1, "a".to_string()), (1, "b".to_string()), (2, "a".to_string()), (2, "b".to_string())]);
1808 /// # }));
1809 /// ```
1810 pub fn repeat_with_keys<K, V2>(
1811 self,
1812 keys: KeyedSingleton<K, V2, L, Bounded>,
1813 ) -> KeyedStream<K, T, L, Bounded, O, R>
1814 where
1815 K: Clone,
1816 T: Clone,
1817 {
1818 keys.keys()
1819 .weaken_retries()
1820 .assume_ordering::<TotalOrder>(
1821 nondet!(/** keyed stream does not depend on ordering of keys */),
1822 )
1823 .cross_product_nested_loop(self)
1824 .into_keyed()
1825 }
1826}
1827
1828impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
1829where
1830 L: Location<'a>,
1831{
1832 #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
1833 /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
1834 /// by equi-joining the two streams on the key attribute `K`.
1835 ///
1836 /// # Example
1837 /// ```rust
1838 /// # use hydro_lang::prelude::*;
1839 /// # use std::collections::HashSet;
1840 /// # use futures::StreamExt;
1841 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1842 /// let tick = process.tick();
1843 /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
1844 /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
1845 /// stream1.join(stream2)
1846 /// # }, |mut stream| async move {
1847 /// // (1, ('a', 'x')), (2, ('b', 'y'))
1848 /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
1849 /// # stream.map(|i| assert!(expected.contains(&i)));
1850 /// # }));
1851 pub fn join<V2, O2: Ordering, R2: Retries>(
1852 self,
1853 n: Stream<(K, V2), L, B, O2, R2>,
1854 ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
1855 where
1856 K: Eq + Hash,
1857 R: MinRetries<R2>,
1858 {
1859 check_matching_location(&self.location, &n.location);
1860
1861 Stream::new(
1862 self.location.clone(),
1863 HydroNode::Join {
1864 left: Box::new(self.ir_node.into_inner()),
1865 right: Box::new(n.ir_node.into_inner()),
1866 metadata: self.location.new_node_metadata(Stream::<
1867 (K, (V1, V2)),
1868 L,
1869 B,
1870 NoOrder,
1871 <R as MinRetries<R2>>::Min,
1872 >::collection_kind()),
1873 },
1874 )
1875 }
1876
1877 /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
1878 /// computes the anti-join of the items in the input -- i.e. returns
1879 /// unique items in the first input that do not have a matching key
1880 /// in the second input.
1881 ///
1882 /// # Example
1883 /// ```rust
1884 /// # use hydro_lang::prelude::*;
1885 /// # use futures::StreamExt;
1886 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1887 /// let tick = process.tick();
1888 /// let stream = process
1889 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1890 /// .batch(&tick, nondet!(/** test */));
1891 /// let batch = process
1892 /// .source_iter(q!(vec![1, 2]))
1893 /// .batch(&tick, nondet!(/** test */));
1894 /// stream.anti_join(batch).all_ticks()
1895 /// # }, |mut stream| async move {
1896 /// # for w in vec![(3, 'c'), (4, 'd')] {
1897 /// # assert_eq!(stream.next().await.unwrap(), w);
1898 /// # }
1899 /// # }));
1900 pub fn anti_join<O2: Ordering, R2: Retries>(
1901 self,
1902 n: Stream<K, L, Bounded, O2, R2>,
1903 ) -> Stream<(K, V1), L, B, O, R>
1904 where
1905 K: Eq + Hash,
1906 {
1907 check_matching_location(&self.location, &n.location);
1908
1909 Stream::new(
1910 self.location.clone(),
1911 HydroNode::AntiJoin {
1912 pos: Box::new(self.ir_node.into_inner()),
1913 neg: Box::new(n.ir_node.into_inner()),
1914 metadata: self
1915 .location
1916 .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
1917 },
1918 )
1919 }
1920}
1921
1922impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
1923 Stream<(K, V), L, B, O, R>
1924{
1925 /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
1926 /// is used as the key and the second element is added to the entries associated with that key.
1927 ///
1928 /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
1929 /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
1930 /// performing grouped aggregations, but also for more precise ordering guarantees such as
1931 /// total ordering _within_ each group but no ordering _across_ groups.
1932 ///
1933 /// # Example
1934 /// ```rust
1935 /// # use hydro_lang::prelude::*;
1936 /// # use futures::StreamExt;
1937 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1938 /// process
1939 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1940 /// .into_keyed()
1941 /// # .entries()
1942 /// # }, |mut stream| async move {
1943 /// // { 1: [2, 3], 2: [4] }
1944 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
1945 /// # assert_eq!(stream.next().await.unwrap(), w);
1946 /// # }
1947 /// # }));
1948 /// ```
1949 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
1950 KeyedStream::new(
1951 self.location.clone(),
1952 HydroNode::Cast {
1953 inner: Box::new(self.ir_node.into_inner()),
1954 metadata: self
1955 .location
1956 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
1957 },
1958 )
1959 }
1960}
1961
1962impl<'a, K, V, L> Stream<(K, V), Tick<L>, Bounded, TotalOrder, ExactlyOnce>
1963where
1964 K: Eq + Hash,
1965 L: Location<'a>,
1966{
1967 #[deprecated = "use .into_keyed().fold(...) instead"]
1968 /// A special case of [`Stream::fold`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1969 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1970 /// in the second element are accumulated via the `comb` closure.
1971 ///
1972 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1973 /// to depend on the order of elements in the stream.
1974 ///
1975 /// If the input and output value types are the same and do not require initialization then use
1976 /// [`Stream::reduce_keyed`].
1977 ///
1978 /// # Example
1979 /// ```rust
1980 /// # use hydro_lang::prelude::*;
1981 /// # use futures::StreamExt;
1982 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1983 /// let tick = process.tick();
1984 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1985 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1986 /// batch
1987 /// .fold_keyed(q!(|| 0), q!(|acc, x| *acc += x))
1988 /// .all_ticks()
1989 /// # }, |mut stream| async move {
1990 /// // (1, 5), (2, 7)
1991 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1992 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1993 /// # }));
1994 /// ```
1995 pub fn fold_keyed<A, I, F>(
1996 self,
1997 init: impl IntoQuotedMut<'a, I, Tick<L>>,
1998 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1999 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2000 where
2001 I: Fn() -> A + 'a,
2002 F: Fn(&mut A, V) + 'a,
2003 {
2004 self.into_keyed().fold(init, comb).entries()
2005 }
2006
2007 #[deprecated = "use .into_keyed().reduce(...) instead"]
2008 /// A special case of [`Stream::reduce`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2009 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2010 /// in the second element are accumulated via the `comb` closure.
2011 ///
2012 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
2013 /// to depend on the order of elements in the stream.
2014 ///
2015 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed`].
2016 ///
2017 /// # Example
2018 /// ```rust
2019 /// # use hydro_lang::prelude::*;
2020 /// # use futures::StreamExt;
2021 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2022 /// let tick = process.tick();
2023 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2024 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2025 /// batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks()
2026 /// # }, |mut stream| async move {
2027 /// // (1, 5), (2, 7)
2028 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2029 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2030 /// # }));
2031 /// ```
2032 pub fn reduce_keyed<F>(
2033 self,
2034 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2035 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2036 where
2037 F: Fn(&mut V, V) + 'a,
2038 {
2039 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
2040
2041 Stream::new(
2042 self.location.clone(),
2043 HydroNode::ReduceKeyed {
2044 f,
2045 input: Box::new(self.ir_node.into_inner()),
2046 metadata: self.location.new_node_metadata(Stream::<
2047 (K, V),
2048 Tick<L>,
2049 Bounded,
2050 NoOrder,
2051 ExactlyOnce,
2052 >::collection_kind()),
2053 },
2054 )
2055 }
2056}
2057
2058impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2059where
2060 K: Eq + Hash,
2061 L: Location<'a>,
2062{
2063 #[deprecated = "use .into_keyed().fold_commutative_idempotent(...) instead"]
2064 /// A special case of [`Stream::fold_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2065 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2066 /// in the second element are accumulated via the `comb` closure.
2067 ///
2068 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2069 /// as there may be non-deterministic duplicates.
2070 ///
2071 /// If the input and output value types are the same and do not require initialization then use
2072 /// [`Stream::reduce_keyed_commutative_idempotent`].
2073 ///
2074 /// # Example
2075 /// ```rust
2076 /// # use hydro_lang::prelude::*;
2077 /// # use futures::StreamExt;
2078 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2079 /// let tick = process.tick();
2080 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2081 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2082 /// batch
2083 /// .fold_keyed_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2084 /// .all_ticks()
2085 /// # }, |mut stream| async move {
2086 /// // (1, false), (2, true)
2087 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2088 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2089 /// # }));
2090 /// ```
2091 pub fn fold_keyed_commutative_idempotent<A, I, F>(
2092 self,
2093 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2094 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2095 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2096 where
2097 I: Fn() -> A + 'a,
2098 F: Fn(&mut A, V) + 'a,
2099 {
2100 self.into_keyed()
2101 .fold_commutative_idempotent(init, comb)
2102 .entries()
2103 }
2104
2105 /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2106 /// # Example
2107 /// ```rust
2108 /// # use hydro_lang::prelude::*;
2109 /// # use futures::StreamExt;
2110 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2111 /// let tick = process.tick();
2112 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2113 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2114 /// batch.keys().all_ticks()
2115 /// # }, |mut stream| async move {
2116 /// // 1, 2
2117 /// # assert_eq!(stream.next().await.unwrap(), 1);
2118 /// # assert_eq!(stream.next().await.unwrap(), 2);
2119 /// # }));
2120 /// ```
2121 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2122 self.into_keyed()
2123 .fold_commutative_idempotent(q!(|| ()), q!(|_, _| {}))
2124 .keys()
2125 }
2126
2127 #[deprecated = "use .into_keyed().reduce_commutative_idempotent(...) instead"]
2128 /// A special case of [`Stream::reduce_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2129 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2130 /// in the second element are accumulated via the `comb` closure.
2131 ///
2132 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2133 /// as there may be non-deterministic duplicates.
2134 ///
2135 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative_idempotent`].
2136 ///
2137 /// # Example
2138 /// ```rust
2139 /// # use hydro_lang::prelude::*;
2140 /// # use futures::StreamExt;
2141 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2142 /// let tick = process.tick();
2143 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2144 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2145 /// batch
2146 /// .reduce_keyed_commutative_idempotent(q!(|acc, x| *acc |= x))
2147 /// .all_ticks()
2148 /// # }, |mut stream| async move {
2149 /// // (1, false), (2, true)
2150 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2151 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2152 /// # }));
2153 /// ```
2154 pub fn reduce_keyed_commutative_idempotent<F>(
2155 self,
2156 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2157 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2158 where
2159 F: Fn(&mut V, V) + 'a,
2160 {
2161 self.into_keyed()
2162 .reduce_commutative_idempotent(comb)
2163 .entries()
2164 }
2165}
2166
2167impl<'a, K, V, L, O: Ordering> Stream<(K, V), Tick<L>, Bounded, O, ExactlyOnce>
2168where
2169 K: Eq + Hash,
2170 L: Location<'a>,
2171{
2172 #[deprecated = "use .into_keyed().fold_commutative(...) instead"]
2173 /// A special case of [`Stream::fold_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2174 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2175 /// in the second element are accumulated via the `comb` closure.
2176 ///
2177 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2178 ///
2179 /// If the input and output value types are the same and do not require initialization then use
2180 /// [`Stream::reduce_keyed_commutative`].
2181 ///
2182 /// # Example
2183 /// ```rust
2184 /// # use hydro_lang::prelude::*;
2185 /// # use futures::StreamExt;
2186 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2187 /// let tick = process.tick();
2188 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2189 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2190 /// batch
2191 /// .fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x))
2192 /// .all_ticks()
2193 /// # }, |mut stream| async move {
2194 /// // (1, 5), (2, 7)
2195 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2196 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2197 /// # }));
2198 /// ```
2199 pub fn fold_keyed_commutative<A, I, F>(
2200 self,
2201 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2202 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2203 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2204 where
2205 I: Fn() -> A + 'a,
2206 F: Fn(&mut A, V) + 'a,
2207 {
2208 self.into_keyed().fold_commutative(init, comb).entries()
2209 }
2210
2211 #[deprecated = "use .into_keyed().reduce_commutative(...) instead"]
2212 /// A special case of [`Stream::reduce_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2213 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2214 /// in the second element are accumulated via the `comb` closure.
2215 ///
2216 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2217 ///
2218 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative`].
2219 ///
2220 /// # Example
2221 /// ```rust
2222 /// # use hydro_lang::prelude::*;
2223 /// # use futures::StreamExt;
2224 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2225 /// let tick = process.tick();
2226 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2227 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2228 /// batch
2229 /// .reduce_keyed_commutative(q!(|acc, x| *acc += x))
2230 /// .all_ticks()
2231 /// # }, |mut stream| async move {
2232 /// // (1, 5), (2, 7)
2233 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2234 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2235 /// # }));
2236 /// ```
2237 pub fn reduce_keyed_commutative<F>(
2238 self,
2239 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2240 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2241 where
2242 F: Fn(&mut V, V) + 'a,
2243 {
2244 self.into_keyed().reduce_commutative(comb).entries()
2245 }
2246}
2247
2248impl<'a, K, V, L, R: Retries> Stream<(K, V), Tick<L>, Bounded, TotalOrder, R>
2249where
2250 K: Eq + Hash,
2251 L: Location<'a>,
2252{
2253 #[deprecated = "use .into_keyed().fold_idempotent(...) instead"]
2254 /// A special case of [`Stream::fold_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2255 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2256 /// in the second element are accumulated via the `comb` closure.
2257 ///
2258 /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
2259 ///
2260 /// If the input and output value types are the same and do not require initialization then use
2261 /// [`Stream::reduce_keyed_idempotent`].
2262 ///
2263 /// # Example
2264 /// ```rust
2265 /// # use hydro_lang::prelude::*;
2266 /// # use futures::StreamExt;
2267 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2268 /// let tick = process.tick();
2269 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2270 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2271 /// batch
2272 /// .fold_keyed_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2273 /// .all_ticks()
2274 /// # }, |mut stream| async move {
2275 /// // (1, false), (2, true)
2276 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2277 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2278 /// # }));
2279 /// ```
2280 pub fn fold_keyed_idempotent<A, I, F>(
2281 self,
2282 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2283 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2284 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2285 where
2286 I: Fn() -> A + 'a,
2287 F: Fn(&mut A, V) + 'a,
2288 {
2289 self.into_keyed().fold_idempotent(init, comb).entries()
2290 }
2291
2292 #[deprecated = "use .into_keyed().reduce_idempotent(...) instead"]
2293 /// A special case of [`Stream::reduce_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2294 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2295 /// in the second element are accumulated via the `comb` closure.
2296 ///
2297 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
2298 ///
2299 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_idempotent`].
2300 ///
2301 /// # Example
2302 /// ```rust
2303 /// # use hydro_lang::prelude::*;
2304 /// # use futures::StreamExt;
2305 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2306 /// let tick = process.tick();
2307 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2308 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2309 /// batch
2310 /// .reduce_keyed_idempotent(q!(|acc, x| *acc |= x))
2311 /// .all_ticks()
2312 /// # }, |mut stream| async move {
2313 /// // (1, false), (2, true)
2314 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2315 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2316 /// # }));
2317 /// ```
2318 pub fn reduce_keyed_idempotent<F>(
2319 self,
2320 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2321 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2322 where
2323 F: Fn(&mut V, V) + 'a,
2324 {
2325 self.into_keyed().reduce_idempotent(comb).entries()
2326 }
2327}
2328
2329impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2330where
2331 L: Location<'a> + NoTick,
2332{
2333 /// Returns a stream corresponding to the latest batch of elements being atomically
2334 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2335 /// the order of the input.
2336 ///
2337 /// # Non-Determinism
2338 /// The batch boundaries are non-deterministic and may change across executions.
2339 pub fn batch_atomic(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2340 Stream::new(
2341 self.location.clone().tick,
2342 HydroNode::Batch {
2343 inner: Box::new(self.ir_node.into_inner()),
2344 metadata: self
2345 .location
2346 .tick
2347 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2348 },
2349 )
2350 }
2351
2352 /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2353 /// See [`Stream::atomic`] for more details.
2354 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2355 Stream::new(
2356 self.location.tick.l.clone(),
2357 HydroNode::EndAtomic {
2358 inner: Box::new(self.ir_node.into_inner()),
2359 metadata: self
2360 .location
2361 .tick
2362 .l
2363 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2364 },
2365 )
2366 }
2367}
2368
2369impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2370where
2371 L: Location<'a>,
2372{
2373 /// Shifts this stream into an atomic context, which guarantees that any downstream logic
2374 /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
2375 ///
2376 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2377 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2378 /// argument that declares where the stream will be atomically processed. Batching a stream into
2379 /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2380 /// [`Tick`] will introduce asynchrony.
2381 pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
2382 let out_location = Atomic { tick: tick.clone() };
2383 Stream::new(
2384 out_location.clone(),
2385 HydroNode::BeginAtomic {
2386 inner: Box::new(self.ir_node.into_inner()),
2387 metadata: out_location
2388 .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
2389 },
2390 )
2391 }
2392
2393 /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2394 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2395 /// the order of the input. The output stream will execute in the [`Tick`] that was
2396 /// used to create the atomic section.
2397 ///
2398 /// # Non-Determinism
2399 /// The batch boundaries are non-deterministic and may change across executions.
2400 pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2401 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2402 Stream::new(
2403 tick.clone(),
2404 HydroNode::Batch {
2405 inner: Box::new(self.ir_node.into_inner()),
2406 metadata: tick
2407 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2408 },
2409 )
2410 }
2411
2412 /// Given a time interval, returns a stream corresponding to samples taken from the
2413 /// stream roughly at that interval. The output will have elements in the same order
2414 /// as the input, but with arbitrary elements skipped between samples. There is also
2415 /// no guarantee on the exact timing of the samples.
2416 ///
2417 /// # Non-Determinism
2418 /// The output stream is non-deterministic in which elements are sampled, since this
2419 /// is controlled by a clock.
2420 pub fn sample_every(
2421 self,
2422 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
2423 nondet: NonDet,
2424 ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
2425 where
2426 L: NoTick + NoAtomic,
2427 {
2428 let samples = self.location.source_interval(interval, nondet);
2429
2430 let tick = self.location.tick();
2431 self.batch(&tick, nondet)
2432 .filter_if_some(samples.batch(&tick, nondet).first())
2433 .all_ticks()
2434 .weakest_retries()
2435 }
2436
2437 /// Given a timeout duration, returns an [`Optional`] which will have a value if the
2438 /// stream has not emitted a value since that duration.
2439 ///
2440 /// # Non-Determinism
2441 /// Timeout relies on non-deterministic sampling of the stream, so depending on when
2442 /// samples take place, timeouts may be non-deterministically generated or missed,
2443 /// and the notification of the timeout may be delayed as well. There is also no
2444 /// guarantee on how long the [`Optional`] will have a value after the timeout is
2445 /// detected based on when the next sample is taken.
2446 pub fn timeout(
2447 self,
2448 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
2449 nondet: NonDet,
2450 ) -> Optional<(), L, Unbounded>
2451 where
2452 L: NoTick + NoAtomic,
2453 {
2454 let tick = self.location.tick();
2455
2456 let latest_received = self.assume_retries(nondet).fold_commutative(
2457 q!(|| None),
2458 q!(|latest, _| {
2459 *latest = Some(Instant::now());
2460 }),
2461 );
2462
2463 latest_received
2464 .snapshot(&tick, nondet)
2465 .filter_map(q!(move |latest_received| {
2466 if let Some(latest_received) = latest_received {
2467 if Instant::now().duration_since(latest_received) > duration {
2468 Some(())
2469 } else {
2470 None
2471 }
2472 } else {
2473 Some(())
2474 }
2475 }))
2476 .latest()
2477 }
2478}
2479
2480impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2481where
2482 L: Location<'a> + NoTick + NoAtomic,
2483 F: Future<Output = T>,
2484{
2485 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2486 /// Future outputs are produced as available, regardless of input arrival order.
2487 ///
2488 /// # Example
2489 /// ```rust
2490 /// # use std::collections::HashSet;
2491 /// # use futures::StreamExt;
2492 /// # use hydro_lang::prelude::*;
2493 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2494 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2495 /// .map(q!(|x| async move {
2496 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2497 /// x
2498 /// }))
2499 /// .resolve_futures()
2500 /// # },
2501 /// # |mut stream| async move {
2502 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2503 /// # let mut output = HashSet::new();
2504 /// # for _ in 1..10 {
2505 /// # output.insert(stream.next().await.unwrap());
2506 /// # }
2507 /// # assert_eq!(
2508 /// # output,
2509 /// # HashSet::<i32>::from_iter(1..10)
2510 /// # );
2511 /// # },
2512 /// # ));
2513 pub fn resolve_futures(self) -> Stream<T, L, B, NoOrder, R> {
2514 Stream::new(
2515 self.location.clone(),
2516 HydroNode::ResolveFutures {
2517 input: Box::new(self.ir_node.into_inner()),
2518 metadata: self
2519 .location
2520 .new_node_metadata(Stream::<T, L, B, NoOrder, R>::collection_kind()),
2521 },
2522 )
2523 }
2524
2525 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2526 /// Future outputs are produced in the same order as the input stream.
2527 ///
2528 /// # Example
2529 /// ```rust
2530 /// # use std::collections::HashSet;
2531 /// # use futures::StreamExt;
2532 /// # use hydro_lang::prelude::*;
2533 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2534 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2535 /// .map(q!(|x| async move {
2536 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2537 /// x
2538 /// }))
2539 /// .resolve_futures_ordered()
2540 /// # },
2541 /// # |mut stream| async move {
2542 /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2543 /// # let mut output = Vec::new();
2544 /// # for _ in 1..10 {
2545 /// # output.push(stream.next().await.unwrap());
2546 /// # }
2547 /// # assert_eq!(
2548 /// # output,
2549 /// # vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2550 /// # );
2551 /// # },
2552 /// # ));
2553 pub fn resolve_futures_ordered(self) -> Stream<T, L, B, O, R> {
2554 Stream::new(
2555 self.location.clone(),
2556 HydroNode::ResolveFuturesOrdered {
2557 input: Box::new(self.ir_node.into_inner()),
2558 metadata: self
2559 .location
2560 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2561 },
2562 )
2563 }
2564}
2565
2566impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
2567where
2568 L: Location<'a> + NoTick,
2569{
2570 /// Executes the provided closure for every element in this stream.
2571 ///
2572 /// Because the closure may have side effects, the stream must have deterministic order
2573 /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
2574 /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
2575 /// [`Stream::assume_retries`] with an explanation for why this is the case.
2576 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
2577 let f = f.splice_fn1_ctx(&self.location).into();
2578 self.location
2579 .flow_state()
2580 .borrow_mut()
2581 .push_root(HydroRoot::ForEach {
2582 input: Box::new(self.ir_node.into_inner()),
2583 f,
2584 op_metadata: HydroIrOpMetadata::new(),
2585 });
2586 }
2587
2588 /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
2589 /// TCP socket to some other server. You should _not_ use this API for interacting with
2590 /// external clients, instead see [`Location::bidi_external_many_bytes`] and
2591 /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
2592 /// interaction with asynchronous sinks.
2593 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
2594 where
2595 S: 'a + futures::Sink<T> + Unpin,
2596 {
2597 self.location
2598 .flow_state()
2599 .borrow_mut()
2600 .push_root(HydroRoot::DestSink {
2601 sink: sink.splice_typed_ctx(&self.location).into(),
2602 input: Box::new(self.ir_node.into_inner()),
2603 op_metadata: HydroIrOpMetadata::new(),
2604 });
2605 }
2606}
2607
2608impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2609where
2610 L: Location<'a>,
2611{
2612 /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2613 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2614 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2615 Stream::new(
2616 self.location.outer().clone(),
2617 HydroNode::YieldConcat {
2618 inner: Box::new(self.ir_node.into_inner()),
2619 metadata: self
2620 .location
2621 .outer()
2622 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2623 },
2624 )
2625 }
2626
2627 /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2628 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2629 ///
2630 /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2631 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2632 /// stream's [`Tick`] context.
2633 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2634 let out_location = Atomic {
2635 tick: self.location.clone(),
2636 };
2637
2638 Stream::new(
2639 out_location.clone(),
2640 HydroNode::YieldConcat {
2641 inner: Box::new(self.ir_node.into_inner()),
2642 metadata: out_location
2643 .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2644 },
2645 )
2646 }
2647
2648 /// Accumulates the elements of this stream **across ticks** by concatenating them together.
2649 ///
2650 /// The output stream in tick T will contain the elements of the input at tick 0, 1, ..., up to
2651 /// and including tick T. This is useful for accumulating streaming inputs across ticks, but be
2652 /// careful when using this operator, as its memory usage will grow linearly over time since it
2653 /// must store its inputs indefinitely.
2654 ///
2655 /// # Example
2656 /// ```rust
2657 /// # use hydro_lang::prelude::*;
2658 /// # use futures::StreamExt;
2659 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2660 /// let tick = process.tick();
2661 /// // ticks are lazy by default, forces the second tick to run
2662 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2663 ///
2664 /// let batch_first_tick = process
2665 /// .source_iter(q!(vec![1, 2, 3, 4]))
2666 /// .batch(&tick, nondet!(/** test */));
2667 /// let batch_second_tick = process
2668 /// .source_iter(q!(vec![5, 6, 7, 8]))
2669 /// .batch(&tick, nondet!(/** test */))
2670 /// .defer_tick(); // appears on the second tick
2671 /// batch_first_tick.chain(batch_second_tick)
2672 /// .persist()
2673 /// .all_ticks()
2674 /// # }, |mut stream| async move {
2675 /// // [1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, ...]
2676 /// # for w in vec![1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8] {
2677 /// # assert_eq!(stream.next().await.unwrap(), w);
2678 /// # }
2679 /// # }));
2680 /// ```
2681 pub fn persist(self) -> Stream<T, Tick<L>, Bounded, O, R>
2682 where
2683 T: Clone,
2684 {
2685 Stream::new(
2686 self.location.clone(),
2687 HydroNode::Persist {
2688 inner: Box::new(self.ir_node.into_inner()),
2689 metadata: self
2690 .location
2691 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2692 },
2693 )
2694 }
2695
2696 /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2697 /// always has the elements of `self` at tick `T - 1`.
2698 ///
2699 /// At tick `0`, the output stream is empty, since there is no previous tick.
2700 ///
2701 /// This operator enables stateful iterative processing with ticks, by sending data from one
2702 /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2703 ///
2704 /// # Example
2705 /// ```rust
2706 /// # use hydro_lang::prelude::*;
2707 /// # use futures::StreamExt;
2708 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2709 /// let tick = process.tick();
2710 /// // ticks are lazy by default, forces the second tick to run
2711 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2712 ///
2713 /// let batch_first_tick = process
2714 /// .source_iter(q!(vec![1, 2, 3, 4]))
2715 /// .batch(&tick, nondet!(/** test */));
2716 /// let batch_second_tick = process
2717 /// .source_iter(q!(vec![0, 3, 4, 5, 6]))
2718 /// .batch(&tick, nondet!(/** test */))
2719 /// .defer_tick(); // appears on the second tick
2720 /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2721 ///
2722 /// changes_across_ticks.clone().filter_not_in(
2723 /// changes_across_ticks.defer_tick() // the elements from the previous tick
2724 /// ).all_ticks()
2725 /// # }, |mut stream| async move {
2726 /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2727 /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2728 /// # assert_eq!(stream.next().await.unwrap(), w);
2729 /// # }
2730 /// # }));
2731 /// ```
2732 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2733 Stream::new(
2734 self.location.clone(),
2735 HydroNode::DeferTick {
2736 input: Box::new(self.ir_node.into_inner()),
2737 metadata: self
2738 .location
2739 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2740 },
2741 )
2742 }
2743}
2744
2745#[cfg(test)]
2746mod tests {
2747 use futures::{SinkExt, StreamExt};
2748 use hydro_deploy::Deployment;
2749 use serde::{Deserialize, Serialize};
2750 use stageleft::q;
2751
2752 use crate::compile::builder::FlowBuilder;
2753 use crate::live_collections::stream::{ExactlyOnce, NoOrder, TotalOrder};
2754 use crate::location::Location;
2755 use crate::nondet::nondet;
2756
2757 mod backtrace_chained_ops;
2758
2759 struct P1 {}
2760 struct P2 {}
2761
2762 #[derive(Serialize, Deserialize, Debug)]
2763 struct SendOverNetwork {
2764 n: u32,
2765 }
2766
2767 #[tokio::test]
2768 async fn first_ten_distributed() {
2769 let mut deployment = Deployment::new();
2770
2771 let flow = FlowBuilder::new();
2772 let first_node = flow.process::<P1>();
2773 let second_node = flow.process::<P2>();
2774 let external = flow.external::<P2>();
2775
2776 let numbers = first_node.source_iter(q!(0..10));
2777 let out_port = numbers
2778 .map(q!(|n| SendOverNetwork { n }))
2779 .send_bincode(&second_node)
2780 .send_bincode_external(&external);
2781
2782 let nodes = flow
2783 .with_process(&first_node, deployment.Localhost())
2784 .with_process(&second_node, deployment.Localhost())
2785 .with_external(&external, deployment.Localhost())
2786 .deploy(&mut deployment);
2787
2788 deployment.deploy().await.unwrap();
2789
2790 let mut external_out = nodes.connect(out_port).await;
2791
2792 deployment.start().await.unwrap();
2793
2794 for i in 0..10 {
2795 assert_eq!(external_out.next().await.unwrap().n, i);
2796 }
2797 }
2798
2799 #[tokio::test]
2800 async fn first_cardinality() {
2801 let mut deployment = Deployment::new();
2802
2803 let flow = FlowBuilder::new();
2804 let node = flow.process::<()>();
2805 let external = flow.external::<()>();
2806
2807 let node_tick = node.tick();
2808 let count = node_tick
2809 .singleton(q!([1, 2, 3]))
2810 .into_stream()
2811 .flatten_ordered()
2812 .first()
2813 .into_stream()
2814 .count()
2815 .all_ticks()
2816 .send_bincode_external(&external);
2817
2818 let nodes = flow
2819 .with_process(&node, deployment.Localhost())
2820 .with_external(&external, deployment.Localhost())
2821 .deploy(&mut deployment);
2822
2823 deployment.deploy().await.unwrap();
2824
2825 let mut external_out = nodes.connect(count).await;
2826
2827 deployment.start().await.unwrap();
2828
2829 assert_eq!(external_out.next().await.unwrap(), 1);
2830 }
2831
2832 #[tokio::test]
2833 async fn unbounded_reduce_remembers_state() {
2834 let mut deployment = Deployment::new();
2835
2836 let flow = FlowBuilder::new();
2837 let node = flow.process::<()>();
2838 let external = flow.external::<()>();
2839
2840 let (input_port, input) = node.source_external_bincode(&external);
2841 let out = input
2842 .reduce(q!(|acc, v| *acc += v))
2843 .sample_eager(nondet!(/** test */))
2844 .send_bincode_external(&external);
2845
2846 let nodes = flow
2847 .with_process(&node, deployment.Localhost())
2848 .with_external(&external, deployment.Localhost())
2849 .deploy(&mut deployment);
2850
2851 deployment.deploy().await.unwrap();
2852
2853 let mut external_in = nodes.connect(input_port).await;
2854 let mut external_out = nodes.connect(out).await;
2855
2856 deployment.start().await.unwrap();
2857
2858 external_in.send(1).await.unwrap();
2859 assert_eq!(external_out.next().await.unwrap(), 1);
2860
2861 external_in.send(2).await.unwrap();
2862 assert_eq!(external_out.next().await.unwrap(), 3);
2863 }
2864
2865 #[tokio::test]
2866 async fn atomic_fold_replays_each_tick() {
2867 let mut deployment = Deployment::new();
2868
2869 let flow = FlowBuilder::new();
2870 let node = flow.process::<()>();
2871 let external = flow.external::<()>();
2872
2873 let (input_port, input) =
2874 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2875 let tick = node.tick();
2876
2877 let out = input
2878 .batch(&tick, nondet!(/** test */))
2879 .cross_singleton(
2880 node.source_iter(q!(vec![1, 2, 3]))
2881 .atomic(&tick)
2882 .fold(q!(|| 0), q!(|acc, v| *acc += v))
2883 .snapshot_atomic(nondet!(/** test */)),
2884 )
2885 .all_ticks()
2886 .send_bincode_external(&external);
2887
2888 let nodes = flow
2889 .with_process(&node, deployment.Localhost())
2890 .with_external(&external, deployment.Localhost())
2891 .deploy(&mut deployment);
2892
2893 deployment.deploy().await.unwrap();
2894
2895 let mut external_in = nodes.connect(input_port).await;
2896 let mut external_out = nodes.connect(out).await;
2897
2898 deployment.start().await.unwrap();
2899
2900 external_in.send(1).await.unwrap();
2901 assert_eq!(external_out.next().await.unwrap(), (1, 6));
2902
2903 external_in.send(2).await.unwrap();
2904 assert_eq!(external_out.next().await.unwrap(), (2, 6));
2905 }
2906
2907 #[tokio::test]
2908 async fn unbounded_scan_remembers_state() {
2909 let mut deployment = Deployment::new();
2910
2911 let flow = FlowBuilder::new();
2912 let node = flow.process::<()>();
2913 let external = flow.external::<()>();
2914
2915 let (input_port, input) = node.source_external_bincode(&external);
2916 let out = input
2917 .scan(
2918 q!(|| 0),
2919 q!(|acc, v| {
2920 *acc += v;
2921 Some(*acc)
2922 }),
2923 )
2924 .send_bincode_external(&external);
2925
2926 let nodes = flow
2927 .with_process(&node, deployment.Localhost())
2928 .with_external(&external, deployment.Localhost())
2929 .deploy(&mut deployment);
2930
2931 deployment.deploy().await.unwrap();
2932
2933 let mut external_in = nodes.connect(input_port).await;
2934 let mut external_out = nodes.connect(out).await;
2935
2936 deployment.start().await.unwrap();
2937
2938 external_in.send(1).await.unwrap();
2939 assert_eq!(external_out.next().await.unwrap(), 1);
2940
2941 external_in.send(2).await.unwrap();
2942 assert_eq!(external_out.next().await.unwrap(), 3);
2943 }
2944
2945 #[tokio::test]
2946 async fn unbounded_enumerate_remembers_state() {
2947 let mut deployment = Deployment::new();
2948
2949 let flow = FlowBuilder::new();
2950 let node = flow.process::<()>();
2951 let external = flow.external::<()>();
2952
2953 let (input_port, input) = node.source_external_bincode(&external);
2954 let out = input.enumerate().send_bincode_external(&external);
2955
2956 let nodes = flow
2957 .with_process(&node, deployment.Localhost())
2958 .with_external(&external, deployment.Localhost())
2959 .deploy(&mut deployment);
2960
2961 deployment.deploy().await.unwrap();
2962
2963 let mut external_in = nodes.connect(input_port).await;
2964 let mut external_out = nodes.connect(out).await;
2965
2966 deployment.start().await.unwrap();
2967
2968 external_in.send(1).await.unwrap();
2969 assert_eq!(external_out.next().await.unwrap(), (0, 1));
2970
2971 external_in.send(2).await.unwrap();
2972 assert_eq!(external_out.next().await.unwrap(), (1, 2));
2973 }
2974
2975 #[tokio::test]
2976 async fn unbounded_unique_remembers_state() {
2977 let mut deployment = Deployment::new();
2978
2979 let flow = FlowBuilder::new();
2980 let node = flow.process::<()>();
2981 let external = flow.external::<()>();
2982
2983 let (input_port, input) =
2984 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2985 let out = input.unique().send_bincode_external(&external);
2986
2987 let nodes = flow
2988 .with_process(&node, deployment.Localhost())
2989 .with_external(&external, deployment.Localhost())
2990 .deploy(&mut deployment);
2991
2992 deployment.deploy().await.unwrap();
2993
2994 let mut external_in = nodes.connect(input_port).await;
2995 let mut external_out = nodes.connect(out).await;
2996
2997 deployment.start().await.unwrap();
2998
2999 external_in.send(1).await.unwrap();
3000 assert_eq!(external_out.next().await.unwrap(), 1);
3001
3002 external_in.send(2).await.unwrap();
3003 assert_eq!(external_out.next().await.unwrap(), 2);
3004
3005 external_in.send(1).await.unwrap();
3006 external_in.send(3).await.unwrap();
3007 assert_eq!(external_out.next().await.unwrap(), 3);
3008 }
3009
3010 #[test]
3011 #[should_panic]
3012 fn sim_batch_nondet_size() {
3013 let flow = FlowBuilder::new();
3014 let external = flow.external::<()>();
3015 let node = flow.process::<()>();
3016
3017 let (port, input) = node.source_external_bincode::<_, _, TotalOrder, _>(&external);
3018
3019 let tick = node.tick();
3020 let out_port = input
3021 .batch(&tick, nondet!(/** test */))
3022 .count()
3023 .all_ticks()
3024 .send_bincode_external(&external);
3025
3026 flow.sim().exhaustive(async |mut compiled| {
3027 let in_send = compiled.connect(&port);
3028 let mut out_recv = compiled.connect(&out_port);
3029 compiled.launch();
3030
3031 in_send.send(()).unwrap();
3032 in_send.send(()).unwrap();
3033 in_send.send(()).unwrap();
3034
3035 assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3036 });
3037 }
3038
3039 #[test]
3040 fn sim_batch_preserves_order() {
3041 let flow = FlowBuilder::new();
3042 let external = flow.external::<()>();
3043 let node = flow.process::<()>();
3044
3045 let (port, input) = node.source_external_bincode(&external);
3046
3047 let tick = node.tick();
3048 let out_port = input
3049 .batch(&tick, nondet!(/** test */))
3050 .all_ticks()
3051 .send_bincode_external(&external);
3052
3053 flow.sim().exhaustive(async |mut compiled| {
3054 let in_send = compiled.connect(&port);
3055 let out_recv = compiled.connect(&out_port);
3056 compiled.launch();
3057
3058 in_send.send(1).unwrap();
3059 in_send.send(2).unwrap();
3060 in_send.send(3).unwrap();
3061
3062 out_recv.assert_yields_only([1, 2, 3]).await;
3063 });
3064 }
3065
3066 #[test]
3067 fn sim_batch_unordered_shuffles_count() {
3068 let flow = FlowBuilder::new();
3069 let external = flow.external::<()>();
3070 let node = flow.process::<()>();
3071
3072 let (port, input) = node.source_external_bincode::<_, _, NoOrder, _>(&external);
3073
3074 let tick = node.tick();
3075 let batch = input.batch(&tick, nondet!(/** test */));
3076 let out_port = batch.all_ticks().send_bincode_external(&external);
3077
3078 let instance_count = flow.sim().exhaustive(async |mut compiled| {
3079 let in_send = compiled.connect(&port);
3080 let out_recv = compiled.connect(&out_port);
3081 compiled.launch();
3082
3083 in_send.send_many_unordered([1, 2, 3, 4]).unwrap();
3084 out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3085 });
3086
3087 assert_eq!(
3088 instance_count,
3089 75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3090 )
3091 }
3092}