hydro_lang/live_collections/stream/mod.rs
1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::{Generate, KeyedStream};
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::singleton::SingletonBound;
27#[cfg(stageleft_runtime)]
28use crate::location::dynamic::{DynLocation, LocationId};
29use crate::location::tick::{Atomic, DeferTick, NoAtomic};
30use crate::location::{Location, NoTick, Tick, check_matching_location};
31use crate::manual_expr::ManualExpr;
32use crate::nondet::{NonDet, nondet};
33use crate::prelude::manual_proof;
34use crate::properties::{
35 AggFuncAlgebra, ApplyMonotoneStream, ValidCommutativityFor, ValidIdempotenceFor,
36};
37
38pub mod networking;
39
40/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
41#[sealed::sealed]
42pub trait Ordering:
43 MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
44{
45 /// The [`StreamOrder`] corresponding to this type.
46 const ORDERING_KIND: StreamOrder;
47}
48
49/// Marks the stream as being totally ordered, which means that there are
50/// no sources of non-determinism (other than intentional ones) that will
51/// affect the order of elements.
52pub enum TotalOrder {}
53
54#[sealed::sealed]
55impl Ordering for TotalOrder {
56 const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
57}
58
59/// Marks the stream as having no order, which means that the order of
60/// elements may be affected by non-determinism.
61///
62/// This restricts certain operators, such as `fold` and `reduce`, to only
63/// be used with commutative aggregation functions.
64pub enum NoOrder {}
65
66#[sealed::sealed]
67impl Ordering for NoOrder {
68 const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
69}
70
71/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
72/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
73/// have `Self` guarantees instead.
74#[sealed::sealed]
75pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
76#[sealed::sealed]
77impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
78
79/// Helper trait for determining the weakest of two orderings.
80#[sealed::sealed]
81pub trait MinOrder<Other: ?Sized> {
82 /// The weaker of the two orderings.
83 type Min: Ordering;
84}
85
86#[sealed::sealed]
87impl<O: Ordering> MinOrder<O> for TotalOrder {
88 type Min = O;
89}
90
91#[sealed::sealed]
92impl<O: Ordering> MinOrder<O> for NoOrder {
93 type Min = NoOrder;
94}
95
96/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
97#[sealed::sealed]
98pub trait Retries:
99 MinRetries<Self, Min = Self>
100 + MinRetries<ExactlyOnce, Min = Self>
101 + MinRetries<AtLeastOnce, Min = AtLeastOnce>
102{
103 /// The [`StreamRetry`] corresponding to this type.
104 const RETRIES_KIND: StreamRetry;
105}
106
107/// Marks the stream as having deterministic message cardinality, with no
108/// possibility of duplicates.
109pub enum ExactlyOnce {}
110
111#[sealed::sealed]
112impl Retries for ExactlyOnce {
113 const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
114}
115
116/// Marks the stream as having non-deterministic message cardinality, which
117/// means that duplicates may occur, but messages will not be dropped.
118pub enum AtLeastOnce {}
119
120#[sealed::sealed]
121impl Retries for AtLeastOnce {
122 const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
123}
124
125/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
126/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
127/// have `Self` guarantees instead.
128#[sealed::sealed]
129pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
130#[sealed::sealed]
131impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
132
133/// Helper trait for determining the weakest of two retry guarantees.
134#[sealed::sealed]
135pub trait MinRetries<Other: ?Sized> {
136 /// The weaker of the two retry guarantees.
137 type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
138}
139
140#[sealed::sealed]
141impl<R: Retries> MinRetries<R> for ExactlyOnce {
142 type Min = R;
143}
144
145#[sealed::sealed]
146impl<R: Retries> MinRetries<R> for AtLeastOnce {
147 type Min = AtLeastOnce;
148}
149
150#[sealed::sealed]
151#[diagnostic::on_unimplemented(
152 message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
153 label = "required here",
154 note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
155)]
156/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
157pub trait IsOrdered: Ordering {}
158
159#[sealed::sealed]
160#[diagnostic::do_not_recommend]
161impl IsOrdered for TotalOrder {}
162
163#[sealed::sealed]
164#[diagnostic::on_unimplemented(
165 message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
166 label = "required here",
167 note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
168)]
169/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
170pub trait IsExactlyOnce: Retries {}
171
172#[sealed::sealed]
173#[diagnostic::do_not_recommend]
174impl IsExactlyOnce for ExactlyOnce {}
175
176/// Streaming sequence of elements with type `Type`.
177///
178/// This live collection represents a growing sequence of elements, with new elements being
179/// asynchronously appended to the end of the sequence. This can be used to model the arrival
180/// of network input, such as API requests, or streaming ingestion.
181///
182/// By default, all streams have deterministic ordering and each element is materialized exactly
183/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
184/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
185/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
186///
187/// Type Parameters:
188/// - `Type`: the type of elements in the stream
189/// - `Loc`: the location where the stream is being materialized
190/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
191/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
192/// (default is [`TotalOrder`])
193/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
194/// [`AtLeastOnce`] (default is [`ExactlyOnce`])
195pub struct Stream<
196 Type,
197 Loc,
198 Bound: Boundedness = Unbounded,
199 Order: Ordering = TotalOrder,
200 Retry: Retries = ExactlyOnce,
201> {
202 pub(crate) location: Loc,
203 pub(crate) ir_node: RefCell<HydroNode>,
204 pub(crate) flow_state: FlowState,
205
206 _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
207}
208
209impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
210 fn drop(&mut self) {
211 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
212 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
213 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
214 input: Box::new(ir_node),
215 op_metadata: HydroIrOpMetadata::new(),
216 });
217 }
218 }
219}
220
221impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
222 for Stream<T, L, Unbounded, O, R>
223where
224 L: Location<'a>,
225{
226 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
227 let new_meta = stream
228 .location
229 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
230
231 Stream {
232 location: stream.location.clone(),
233 flow_state: stream.flow_state.clone(),
234 ir_node: RefCell::new(HydroNode::Cast {
235 inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
236 metadata: new_meta,
237 }),
238 _phantom: PhantomData,
239 }
240 }
241}
242
243impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
244 for Stream<T, L, B, NoOrder, R>
245where
246 L: Location<'a>,
247{
248 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
249 stream.weaken_ordering()
250 }
251}
252
253impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
254 for Stream<T, L, B, O, AtLeastOnce>
255where
256 L: Location<'a>,
257{
258 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
259 stream.weaken_retries()
260 }
261}
262
263impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
264where
265 L: Location<'a>,
266{
267 fn defer_tick(self) -> Self {
268 Stream::defer_tick(self)
269 }
270}
271
272impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
273 for Stream<T, Tick<L>, Bounded, O, R>
274where
275 L: Location<'a>,
276{
277 type Location = Tick<L>;
278
279 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
280 Stream::new(
281 location.clone(),
282 HydroNode::CycleSource {
283 cycle_id,
284 metadata: location.new_node_metadata(Self::collection_kind()),
285 },
286 )
287 }
288}
289
290impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
291 for Stream<T, Tick<L>, Bounded, O, R>
292where
293 L: Location<'a>,
294{
295 type Location = Tick<L>;
296
297 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
298 let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
299 location.clone(),
300 HydroNode::DeferTick {
301 input: Box::new(HydroNode::CycleSource {
302 cycle_id,
303 metadata: location.new_node_metadata(Self::collection_kind()),
304 }),
305 metadata: location.new_node_metadata(Self::collection_kind()),
306 },
307 );
308
309 from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
310 }
311}
312
313impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
314 for Stream<T, Tick<L>, Bounded, O, R>
315where
316 L: Location<'a>,
317{
318 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
319 assert_eq!(
320 Location::id(&self.location),
321 expected_location,
322 "locations do not match"
323 );
324 self.location
325 .flow_state()
326 .borrow_mut()
327 .push_root(HydroRoot::CycleSink {
328 cycle_id,
329 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
330 op_metadata: HydroIrOpMetadata::new(),
331 });
332 }
333}
334
335impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
336 for Stream<T, L, B, O, R>
337where
338 L: Location<'a> + NoTick,
339{
340 type Location = L;
341
342 fn create_source(cycle_id: CycleId, location: L) -> Self {
343 Stream::new(
344 location.clone(),
345 HydroNode::CycleSource {
346 cycle_id,
347 metadata: location.new_node_metadata(Self::collection_kind()),
348 },
349 )
350 }
351}
352
353impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
354 for Stream<T, L, B, O, R>
355where
356 L: Location<'a> + NoTick,
357{
358 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
359 assert_eq!(
360 Location::id(&self.location),
361 expected_location,
362 "locations do not match"
363 );
364 self.location
365 .flow_state()
366 .borrow_mut()
367 .push_root(HydroRoot::CycleSink {
368 cycle_id,
369 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
370 op_metadata: HydroIrOpMetadata::new(),
371 });
372 }
373}
374
375impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
376where
377 T: Clone,
378 L: Location<'a>,
379{
380 fn clone(&self) -> Self {
381 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
382 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
383 *self.ir_node.borrow_mut() = HydroNode::Tee {
384 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
385 metadata: self.location.new_node_metadata(Self::collection_kind()),
386 };
387 }
388
389 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
390 Stream {
391 location: self.location.clone(),
392 flow_state: self.flow_state.clone(),
393 ir_node: HydroNode::Tee {
394 inner: SharedNode(inner.0.clone()),
395 metadata: metadata.clone(),
396 }
397 .into(),
398 _phantom: PhantomData,
399 }
400 } else {
401 unreachable!()
402 }
403 }
404}
405
406impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
407where
408 L: Location<'a>,
409{
410 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
411 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
412 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
413
414 let flow_state = location.flow_state().clone();
415 Stream {
416 location,
417 flow_state,
418 ir_node: RefCell::new(ir_node),
419 _phantom: PhantomData,
420 }
421 }
422
423 /// Returns the [`Location`] where this stream is being materialized.
424 pub fn location(&self) -> &L {
425 &self.location
426 }
427
428 pub(crate) fn collection_kind() -> CollectionKind {
429 CollectionKind::Stream {
430 bound: B::BOUND_KIND,
431 order: O::ORDERING_KIND,
432 retry: R::RETRIES_KIND,
433 element_type: quote_type::<T>().into(),
434 }
435 }
436
437 /// Produces a stream based on invoking `f` on each element.
438 /// If you do not want to modify the stream and instead only want to view
439 /// each item use [`Stream::inspect`] instead.
440 ///
441 /// # Example
442 /// ```rust
443 /// # #[cfg(feature = "deploy")] {
444 /// # use hydro_lang::prelude::*;
445 /// # use futures::StreamExt;
446 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
447 /// let words = process.source_iter(q!(vec!["hello", "world"]));
448 /// words.map(q!(|x| x.to_uppercase()))
449 /// # }, |mut stream| async move {
450 /// # for w in vec!["HELLO", "WORLD"] {
451 /// # assert_eq!(stream.next().await.unwrap(), w);
452 /// # }
453 /// # }));
454 /// # }
455 /// ```
456 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
457 where
458 F: Fn(T) -> U + 'a,
459 {
460 let f = f.splice_fn1_ctx(&self.location).into();
461 Stream::new(
462 self.location.clone(),
463 HydroNode::Map {
464 f,
465 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
466 metadata: self
467 .location
468 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
469 },
470 )
471 }
472
473 /// For each item `i` in the input stream, transform `i` using `f` and then treat the
474 /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
475 /// for the output type `U` must produce items in a **deterministic** order.
476 ///
477 /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
478 /// not deterministic, use [`Stream::flat_map_unordered`] instead.
479 ///
480 /// # Example
481 /// ```rust
482 /// # #[cfg(feature = "deploy")] {
483 /// # use hydro_lang::prelude::*;
484 /// # use futures::StreamExt;
485 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
486 /// process
487 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
488 /// .flat_map_ordered(q!(|x| x))
489 /// # }, |mut stream| async move {
490 /// // 1, 2, 3, 4
491 /// # for w in (1..5) {
492 /// # assert_eq!(stream.next().await.unwrap(), w);
493 /// # }
494 /// # }));
495 /// # }
496 /// ```
497 pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
498 where
499 I: IntoIterator<Item = U>,
500 F: Fn(T) -> I + 'a,
501 {
502 let f = f.splice_fn1_ctx(&self.location).into();
503 Stream::new(
504 self.location.clone(),
505 HydroNode::FlatMap {
506 f,
507 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
508 metadata: self
509 .location
510 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
511 },
512 )
513 }
514
515 /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
516 /// for the output type `U` to produce items in any order.
517 ///
518 /// # Example
519 /// ```rust
520 /// # #[cfg(feature = "deploy")] {
521 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
522 /// # use futures::StreamExt;
523 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
524 /// process
525 /// .source_iter(q!(vec![
526 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
527 /// std::collections::HashSet::from_iter(vec![3, 4]),
528 /// ]))
529 /// .flat_map_unordered(q!(|x| x))
530 /// # }, |mut stream| async move {
531 /// // 1, 2, 3, 4, but in no particular order
532 /// # let mut results = Vec::new();
533 /// # for w in (1..5) {
534 /// # results.push(stream.next().await.unwrap());
535 /// # }
536 /// # results.sort();
537 /// # assert_eq!(results, vec![1, 2, 3, 4]);
538 /// # }));
539 /// # }
540 /// ```
541 pub fn flat_map_unordered<U, I, F>(
542 self,
543 f: impl IntoQuotedMut<'a, F, L>,
544 ) -> Stream<U, L, B, NoOrder, R>
545 where
546 I: IntoIterator<Item = U>,
547 F: Fn(T) -> I + 'a,
548 {
549 let f = f.splice_fn1_ctx(&self.location).into();
550 Stream::new(
551 self.location.clone(),
552 HydroNode::FlatMap {
553 f,
554 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
555 metadata: self
556 .location
557 .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
558 },
559 )
560 }
561
562 /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
563 /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
564 ///
565 /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
566 /// not deterministic, use [`Stream::flatten_unordered`] instead.
567 ///
568 /// ```rust
569 /// # #[cfg(feature = "deploy")] {
570 /// # use hydro_lang::prelude::*;
571 /// # use futures::StreamExt;
572 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
573 /// process
574 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
575 /// .flatten_ordered()
576 /// # }, |mut stream| async move {
577 /// // 1, 2, 3, 4
578 /// # for w in (1..5) {
579 /// # assert_eq!(stream.next().await.unwrap(), w);
580 /// # }
581 /// # }));
582 /// # }
583 /// ```
584 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
585 where
586 T: IntoIterator<Item = U>,
587 {
588 self.flat_map_ordered(q!(|d| d))
589 }
590
591 /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
592 /// for the element type `T` to produce items in any order.
593 ///
594 /// # Example
595 /// ```rust
596 /// # #[cfg(feature = "deploy")] {
597 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
598 /// # use futures::StreamExt;
599 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
600 /// process
601 /// .source_iter(q!(vec![
602 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
603 /// std::collections::HashSet::from_iter(vec![3, 4]),
604 /// ]))
605 /// .flatten_unordered()
606 /// # }, |mut stream| async move {
607 /// // 1, 2, 3, 4, but in no particular order
608 /// # let mut results = Vec::new();
609 /// # for w in (1..5) {
610 /// # results.push(stream.next().await.unwrap());
611 /// # }
612 /// # results.sort();
613 /// # assert_eq!(results, vec![1, 2, 3, 4]);
614 /// # }));
615 /// # }
616 /// ```
617 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
618 where
619 T: IntoIterator<Item = U>,
620 {
621 self.flat_map_unordered(q!(|d| d))
622 }
623
624 /// For each item in the input stream, apply `f` to produce a [`futures::stream::Stream`],
625 /// then emit the elements of that stream one by one. When the inner stream yields
626 /// `Pending`, this operator yields as well.
627 pub fn flat_map_stream_blocking<U, S, F>(
628 self,
629 f: impl IntoQuotedMut<'a, F, L>,
630 ) -> Stream<U, L, B, O, R>
631 where
632 S: futures::Stream<Item = U>,
633 F: Fn(T) -> S + 'a,
634 {
635 let f = f.splice_fn1_ctx(&self.location).into();
636 Stream::new(
637 self.location.clone(),
638 HydroNode::FlatMapStreamBlocking {
639 f,
640 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
641 metadata: self
642 .location
643 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
644 },
645 )
646 }
647
648 /// For each item in the input stream, treat it as a [`futures::stream::Stream`] and
649 /// emit its elements one by one. When the inner stream yields `Pending`, this operator
650 /// yields as well.
651 pub fn flatten_stream_blocking<U>(self) -> Stream<U, L, B, O, R>
652 where
653 T: futures::Stream<Item = U>,
654 {
655 self.flat_map_stream_blocking(q!(|d| d))
656 }
657
658 /// Creates a stream containing only the elements of the input stream that satisfy a predicate
659 /// `f`, preserving the order of the elements.
660 ///
661 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
662 /// not modify or take ownership of the values. If you need to modify the values while filtering
663 /// use [`Stream::filter_map`] instead.
664 ///
665 /// # Example
666 /// ```rust
667 /// # #[cfg(feature = "deploy")] {
668 /// # use hydro_lang::prelude::*;
669 /// # use futures::StreamExt;
670 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
671 /// process
672 /// .source_iter(q!(vec![1, 2, 3, 4]))
673 /// .filter(q!(|&x| x > 2))
674 /// # }, |mut stream| async move {
675 /// // 3, 4
676 /// # for w in (3..5) {
677 /// # assert_eq!(stream.next().await.unwrap(), w);
678 /// # }
679 /// # }));
680 /// # }
681 /// ```
682 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
683 where
684 F: Fn(&T) -> bool + 'a,
685 {
686 let f = f.splice_fn1_borrow_ctx(&self.location).into();
687 Stream::new(
688 self.location.clone(),
689 HydroNode::Filter {
690 f,
691 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
692 metadata: self.location.new_node_metadata(Self::collection_kind()),
693 },
694 )
695 }
696
697 /// Splits the stream into two streams based on a predicate, without cloning elements.
698 ///
699 /// Elements for which `f` returns `true` are sent to the first output stream,
700 /// and elements for which `f` returns `false` are sent to the second output stream.
701 ///
702 /// Unlike using `filter` twice, this only evaluates the predicate once per element
703 /// and does not require `T: Clone`.
704 ///
705 /// The closure `f` receives a reference `&T` rather than an owned value `T` because
706 /// the predicate is only used for routing; the element itself is moved to the
707 /// appropriate output stream.
708 ///
709 /// # Example
710 /// ```rust
711 /// # #[cfg(feature = "deploy")] {
712 /// # use hydro_lang::prelude::*;
713 /// # use hydro_lang::live_collections::stream::{NoOrder, ExactlyOnce};
714 /// # use futures::StreamExt;
715 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
716 /// let numbers: Stream<_, _, Unbounded> = process.source_iter(q!(vec![1, 2, 3, 4, 5, 6])).into();
717 /// let (evens, odds) = numbers.partition(q!(|&x| x % 2 == 0));
718 /// // evens: 2, 4, 6 tagged with true; odds: 1, 3, 5 tagged with false
719 /// evens.map(q!(|x| (x, true)))
720 /// .merge_unordered(odds.map(q!(|x| (x, false))))
721 /// # }, |mut stream| async move {
722 /// # let mut results = Vec::new();
723 /// # for _ in 0..6 {
724 /// # results.push(stream.next().await.unwrap());
725 /// # }
726 /// # results.sort();
727 /// # assert_eq!(results, vec![(1, false), (2, true), (3, false), (4, true), (5, false), (6, true)]);
728 /// # }));
729 /// # }
730 /// ```
731 #[expect(
732 clippy::type_complexity,
733 reason = "return type mirrors the input stream type"
734 )]
735 pub fn partition<F>(
736 self,
737 f: impl IntoQuotedMut<'a, F, L>,
738 ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
739 where
740 F: Fn(&T) -> bool + 'a,
741 {
742 let f: crate::compile::ir::DebugExpr = f.splice_fn1_borrow_ctx(&self.location).into();
743 let shared = SharedNode(Rc::new(RefCell::new(
744 self.ir_node.replace(HydroNode::Placeholder),
745 )));
746
747 let true_stream = Stream::new(
748 self.location.clone(),
749 HydroNode::Partition {
750 inner: SharedNode(shared.0.clone()),
751 f: f.clone(),
752 is_true: true,
753 metadata: self.location.new_node_metadata(Self::collection_kind()),
754 },
755 );
756
757 let false_stream = Stream::new(
758 self.location.clone(),
759 HydroNode::Partition {
760 inner: SharedNode(shared.0),
761 f,
762 is_true: false,
763 metadata: self.location.new_node_metadata(Self::collection_kind()),
764 },
765 );
766
767 (true_stream, false_stream)
768 }
769
770 /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
771 ///
772 /// # Example
773 /// ```rust
774 /// # #[cfg(feature = "deploy")] {
775 /// # use hydro_lang::prelude::*;
776 /// # use futures::StreamExt;
777 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
778 /// process
779 /// .source_iter(q!(vec!["1", "hello", "world", "2"]))
780 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
781 /// # }, |mut stream| async move {
782 /// // 1, 2
783 /// # for w in (1..3) {
784 /// # assert_eq!(stream.next().await.unwrap(), w);
785 /// # }
786 /// # }));
787 /// # }
788 /// ```
789 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
790 where
791 F: Fn(T) -> Option<U> + 'a,
792 {
793 let f = f.splice_fn1_ctx(&self.location).into();
794 Stream::new(
795 self.location.clone(),
796 HydroNode::FilterMap {
797 f,
798 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
799 metadata: self
800 .location
801 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
802 },
803 )
804 }
805
806 /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
807 /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
808 /// If `other` is an empty [`Optional`], no values will be produced.
809 ///
810 /// # Example
811 /// ```rust
812 /// # #[cfg(feature = "deploy")] {
813 /// # use hydro_lang::prelude::*;
814 /// # use futures::StreamExt;
815 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
816 /// let tick = process.tick();
817 /// let batch = process
818 /// .source_iter(q!(vec![1, 2, 3, 4]))
819 /// .batch(&tick, nondet!(/** test */));
820 /// let count = batch.clone().count(); // `count()` returns a singleton
821 /// batch.cross_singleton(count).all_ticks()
822 /// # }, |mut stream| async move {
823 /// // (1, 4), (2, 4), (3, 4), (4, 4)
824 /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
825 /// # assert_eq!(stream.next().await.unwrap(), w);
826 /// # }
827 /// # }));
828 /// # }
829 /// ```
830 pub fn cross_singleton<O2>(
831 self,
832 other: impl Into<Optional<O2, L, Bounded>>,
833 ) -> Stream<(T, O2), L, B, O, R>
834 where
835 O2: Clone,
836 {
837 let other: Optional<O2, L, Bounded> = other.into();
838 check_matching_location(&self.location, &other.location);
839
840 Stream::new(
841 self.location.clone(),
842 HydroNode::CrossSingleton {
843 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
844 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
845 metadata: self
846 .location
847 .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
848 },
849 )
850 }
851
852 /// Passes this stream through if the boolean signal is `true`, otherwise the output is empty.
853 ///
854 /// # Example
855 /// ```rust
856 /// # #[cfg(feature = "deploy")] {
857 /// # use hydro_lang::prelude::*;
858 /// # use futures::StreamExt;
859 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
860 /// let tick = process.tick();
861 /// // ticks are lazy by default, forces the second tick to run
862 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
863 ///
864 /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
865 /// let batch_first_tick = process
866 /// .source_iter(q!(vec![1, 2, 3, 4]))
867 /// .batch(&tick, nondet!(/** test */));
868 /// let batch_second_tick = process
869 /// .source_iter(q!(vec![5, 6, 7, 8]))
870 /// .batch(&tick, nondet!(/** test */))
871 /// .defer_tick();
872 /// batch_first_tick.chain(batch_second_tick)
873 /// .filter_if(signal)
874 /// .all_ticks()
875 /// # }, |mut stream| async move {
876 /// // [1, 2, 3, 4]
877 /// # for w in vec![1, 2, 3, 4] {
878 /// # assert_eq!(stream.next().await.unwrap(), w);
879 /// # }
880 /// # }));
881 /// # }
882 /// ```
883 pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
884 self.cross_singleton(signal.filter(q!(|b| *b)))
885 .map(q!(|(d, _)| d))
886 }
887
888 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
889 ///
890 /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
891 /// leader of a cluster.
892 ///
893 /// # Example
894 /// ```rust
895 /// # #[cfg(feature = "deploy")] {
896 /// # use hydro_lang::prelude::*;
897 /// # use futures::StreamExt;
898 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
899 /// let tick = process.tick();
900 /// // ticks are lazy by default, forces the second tick to run
901 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
902 ///
903 /// let batch_first_tick = process
904 /// .source_iter(q!(vec![1, 2, 3, 4]))
905 /// .batch(&tick, nondet!(/** test */));
906 /// let batch_second_tick = process
907 /// .source_iter(q!(vec![5, 6, 7, 8]))
908 /// .batch(&tick, nondet!(/** test */))
909 /// .defer_tick(); // appears on the second tick
910 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
911 /// batch_first_tick.chain(batch_second_tick)
912 /// .filter_if_some(some_on_first_tick)
913 /// .all_ticks()
914 /// # }, |mut stream| async move {
915 /// // [1, 2, 3, 4]
916 /// # for w in vec![1, 2, 3, 4] {
917 /// # assert_eq!(stream.next().await.unwrap(), w);
918 /// # }
919 /// # }));
920 /// # }
921 /// ```
922 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
923 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
924 self.filter_if(signal.is_some())
925 }
926
927 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
928 ///
929 /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
930 /// some local state.
931 ///
932 /// # Example
933 /// ```rust
934 /// # #[cfg(feature = "deploy")] {
935 /// # use hydro_lang::prelude::*;
936 /// # use futures::StreamExt;
937 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
938 /// let tick = process.tick();
939 /// // ticks are lazy by default, forces the second tick to run
940 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
941 ///
942 /// let batch_first_tick = process
943 /// .source_iter(q!(vec![1, 2, 3, 4]))
944 /// .batch(&tick, nondet!(/** test */));
945 /// let batch_second_tick = process
946 /// .source_iter(q!(vec![5, 6, 7, 8]))
947 /// .batch(&tick, nondet!(/** test */))
948 /// .defer_tick(); // appears on the second tick
949 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
950 /// batch_first_tick.chain(batch_second_tick)
951 /// .filter_if_none(some_on_first_tick)
952 /// .all_ticks()
953 /// # }, |mut stream| async move {
954 /// // [5, 6, 7, 8]
955 /// # for w in vec![5, 6, 7, 8] {
956 /// # assert_eq!(stream.next().await.unwrap(), w);
957 /// # }
958 /// # }));
959 /// # }
960 /// ```
961 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
962 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
963 self.filter_if(other.is_none())
964 }
965
966 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams,
967 /// returning all tupled pairs.
968 ///
969 /// When the right side is [`Bounded`], it is accumulated first and the left side streams
970 /// through, preserving the left side's ordering. When both sides are [`Unbounded`], a
971 /// symmetric hash join is used and ordering is [`NoOrder`].
972 ///
973 /// # Example
974 /// ```rust
975 /// # #[cfg(feature = "deploy")] {
976 /// # use hydro_lang::prelude::*;
977 /// # use std::collections::HashSet;
978 /// # use futures::StreamExt;
979 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
980 /// let tick = process.tick();
981 /// let stream1 = process.source_iter(q!(vec![1, 2]));
982 /// let stream2 = process.source_iter(q!(vec!['a', 'b']));
983 /// stream1.cross_product(stream2)
984 /// # }, |mut stream| async move {
985 /// // (1, 'a'), (1, 'b'), (2, 'a'), (2, 'b') in any order
986 /// # let expected = HashSet::from([(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]);
987 /// # stream.map(|i| assert!(expected.contains(&i)));
988 /// # }));
989 /// # }
990 pub fn cross_product<T2, B2: Boundedness, O2: Ordering>(
991 self,
992 other: Stream<T2, L, B2, O2, R>,
993 ) -> Stream<(T, T2), L, B, B2::PreserveOrderIfBounded<O>, R>
994 where
995 T: Clone,
996 T2: Clone,
997 {
998 self.map(q!(|v| ((), v)))
999 .join(other.map(q!(|v| ((), v))))
1000 .map(q!(|((), (v1, v2))| (v1, v2)))
1001 }
1002
1003 /// Takes one stream as input and filters out any duplicate occurrences. The output
1004 /// contains all unique values from the input.
1005 ///
1006 /// # Example
1007 /// ```rust
1008 /// # #[cfg(feature = "deploy")] {
1009 /// # use hydro_lang::prelude::*;
1010 /// # use futures::StreamExt;
1011 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1012 /// let tick = process.tick();
1013 /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
1014 /// # }, |mut stream| async move {
1015 /// # for w in vec![1, 2, 3, 4] {
1016 /// # assert_eq!(stream.next().await.unwrap(), w);
1017 /// # }
1018 /// # }));
1019 /// # }
1020 /// ```
1021 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
1022 where
1023 T: Eq + Hash,
1024 {
1025 Stream::new(
1026 self.location.clone(),
1027 HydroNode::Unique {
1028 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1029 metadata: self
1030 .location
1031 .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1032 },
1033 )
1034 }
1035
1036 /// Outputs everything in this stream that is *not* contained in the `other` stream.
1037 ///
1038 /// The `other` stream must be [`Bounded`], since this function will wait until
1039 /// all its elements are available before producing any output.
1040 /// # Example
1041 /// ```rust
1042 /// # #[cfg(feature = "deploy")] {
1043 /// # use hydro_lang::prelude::*;
1044 /// # use futures::StreamExt;
1045 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1046 /// let tick = process.tick();
1047 /// let stream = process
1048 /// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
1049 /// .batch(&tick, nondet!(/** test */));
1050 /// let batch = process
1051 /// .source_iter(q!(vec![1, 2]))
1052 /// .batch(&tick, nondet!(/** test */));
1053 /// stream.filter_not_in(batch).all_ticks()
1054 /// # }, |mut stream| async move {
1055 /// # for w in vec![3, 4] {
1056 /// # assert_eq!(stream.next().await.unwrap(), w);
1057 /// # }
1058 /// # }));
1059 /// # }
1060 /// ```
1061 pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1062 where
1063 T: Eq + Hash,
1064 B2: IsBounded,
1065 {
1066 check_matching_location(&self.location, &other.location);
1067
1068 Stream::new(
1069 self.location.clone(),
1070 HydroNode::Difference {
1071 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1072 neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1073 metadata: self
1074 .location
1075 .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1076 },
1077 )
1078 }
1079
1080 /// An operator which allows you to "inspect" each element of a stream without
1081 /// modifying it. The closure `f` is called on a reference to each item. This is
1082 /// mainly useful for debugging, and should not be used to generate side-effects.
1083 ///
1084 /// # Example
1085 /// ```rust
1086 /// # #[cfg(feature = "deploy")] {
1087 /// # use hydro_lang::prelude::*;
1088 /// # use futures::StreamExt;
1089 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1090 /// let nums = process.source_iter(q!(vec![1, 2]));
1091 /// // prints "1 * 10 = 10" and "2 * 10 = 20"
1092 /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
1093 /// # }, |mut stream| async move {
1094 /// # for w in vec![1, 2] {
1095 /// # assert_eq!(stream.next().await.unwrap(), w);
1096 /// # }
1097 /// # }));
1098 /// # }
1099 /// ```
1100 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1101 where
1102 F: Fn(&T) + 'a,
1103 {
1104 let f = f.splice_fn1_borrow_ctx(&self.location).into();
1105
1106 Stream::new(
1107 self.location.clone(),
1108 HydroNode::Inspect {
1109 f,
1110 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1111 metadata: self.location.new_node_metadata(Self::collection_kind()),
1112 },
1113 )
1114 }
1115
1116 /// Executes the provided closure for every element in this stream.
1117 ///
1118 /// Because the closure may have side effects, the stream must have deterministic order
1119 /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
1120 /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
1121 /// [`Stream::assume_retries`] with an explanation for why this is the case.
1122 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
1123 where
1124 O: IsOrdered,
1125 R: IsExactlyOnce,
1126 {
1127 let f = f.splice_fn1_ctx(&self.location).into();
1128 self.location
1129 .flow_state()
1130 .borrow_mut()
1131 .push_root(HydroRoot::ForEach {
1132 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1133 f,
1134 op_metadata: HydroIrOpMetadata::new(),
1135 });
1136 }
1137
1138 /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
1139 /// TCP socket to some other server. You should _not_ use this API for interacting with
1140 /// external clients, instead see [`Location::bidi_external_many_bytes`] and
1141 /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
1142 /// interaction with asynchronous sinks.
1143 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1144 where
1145 O: IsOrdered,
1146 R: IsExactlyOnce,
1147 S: 'a + futures::Sink<T> + Unpin,
1148 {
1149 self.location
1150 .flow_state()
1151 .borrow_mut()
1152 .push_root(HydroRoot::DestSink {
1153 sink: sink.splice_typed_ctx(&self.location).into(),
1154 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1155 op_metadata: HydroIrOpMetadata::new(),
1156 });
1157 }
1158
1159 /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1160 ///
1161 /// # Example
1162 /// ```rust
1163 /// # #[cfg(feature = "deploy")] {
1164 /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1165 /// # use futures::StreamExt;
1166 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1167 /// let tick = process.tick();
1168 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1169 /// numbers.enumerate()
1170 /// # }, |mut stream| async move {
1171 /// // (0, 1), (1, 2), (2, 3), (3, 4)
1172 /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1173 /// # assert_eq!(stream.next().await.unwrap(), w);
1174 /// # }
1175 /// # }));
1176 /// # }
1177 /// ```
1178 pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1179 where
1180 O: IsOrdered,
1181 R: IsExactlyOnce,
1182 {
1183 Stream::new(
1184 self.location.clone(),
1185 HydroNode::Enumerate {
1186 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1187 metadata: self.location.new_node_metadata(Stream::<
1188 (usize, T),
1189 L,
1190 B,
1191 TotalOrder,
1192 ExactlyOnce,
1193 >::collection_kind()),
1194 },
1195 )
1196 }
1197
1198 /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1199 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1200 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1201 ///
1202 /// Depending on the input stream guarantees, the closure may need to be commutative
1203 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1204 ///
1205 /// # Example
1206 /// ```rust
1207 /// # #[cfg(feature = "deploy")] {
1208 /// # use hydro_lang::prelude::*;
1209 /// # use futures::StreamExt;
1210 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1211 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1212 /// words
1213 /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1214 /// .into_stream()
1215 /// # }, |mut stream| async move {
1216 /// // "HELLOWORLD"
1217 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1218 /// # }));
1219 /// # }
1220 /// ```
1221 pub fn fold<A, I, F, C, Idemp, M, B2: SingletonBound>(
1222 self,
1223 init: impl IntoQuotedMut<'a, I, L>,
1224 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1225 ) -> Singleton<A, L, B2>
1226 where
1227 I: Fn() -> A + 'a,
1228 F: Fn(&mut A, T),
1229 C: ValidCommutativityFor<O>,
1230 Idemp: ValidIdempotenceFor<R>,
1231 B: ApplyMonotoneStream<M, B2>,
1232 {
1233 let init = init.splice_fn0_ctx(&self.location).into();
1234 let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1235 proof.register_proof(&comb);
1236
1237 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1238 let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1239
1240 let core = HydroNode::Fold {
1241 init,
1242 acc: comb.into(),
1243 input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1244 metadata: ordered_etc
1245 .location
1246 .new_node_metadata(Singleton::<A, L, B2>::collection_kind()),
1247 };
1248
1249 Singleton::new(ordered_etc.location.clone(), core)
1250 }
1251
1252 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1253 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1254 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1255 /// reference, so that it can be modified in place.
1256 ///
1257 /// Depending on the input stream guarantees, the closure may need to be commutative
1258 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1259 ///
1260 /// # Example
1261 /// ```rust
1262 /// # #[cfg(feature = "deploy")] {
1263 /// # use hydro_lang::prelude::*;
1264 /// # use futures::StreamExt;
1265 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1266 /// let bools = process.source_iter(q!(vec![false, true, false]));
1267 /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1268 /// # }, |mut stream| async move {
1269 /// // true
1270 /// # assert_eq!(stream.next().await.unwrap(), true);
1271 /// # }));
1272 /// # }
1273 /// ```
1274 pub fn reduce<F, C, Idemp>(
1275 self,
1276 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1277 ) -> Optional<T, L, B>
1278 where
1279 F: Fn(&mut T, T) + 'a,
1280 C: ValidCommutativityFor<O>,
1281 Idemp: ValidIdempotenceFor<R>,
1282 {
1283 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1284 proof.register_proof(&f);
1285
1286 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1287 let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1288
1289 let core = HydroNode::Reduce {
1290 f: f.into(),
1291 input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1292 metadata: ordered_etc
1293 .location
1294 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1295 };
1296
1297 Optional::new(ordered_etc.location.clone(), core)
1298 }
1299
1300 /// Computes the maximum element in the stream as an [`Optional`], which
1301 /// will be empty until the first element in the input arrives.
1302 ///
1303 /// # Example
1304 /// ```rust
1305 /// # #[cfg(feature = "deploy")] {
1306 /// # use hydro_lang::prelude::*;
1307 /// # use futures::StreamExt;
1308 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1309 /// let tick = process.tick();
1310 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1311 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1312 /// batch.max().all_ticks()
1313 /// # }, |mut stream| async move {
1314 /// // 4
1315 /// # assert_eq!(stream.next().await.unwrap(), 4);
1316 /// # }));
1317 /// # }
1318 /// ```
1319 pub fn max(self) -> Optional<T, L, B>
1320 where
1321 T: Ord,
1322 {
1323 self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1324 .assume_ordering_trusted_bounded::<TotalOrder>(
1325 nondet!(/** max is commutative, but order affects intermediates */),
1326 )
1327 .reduce(q!(|curr, new| {
1328 if new > *curr {
1329 *curr = new;
1330 }
1331 }))
1332 }
1333
1334 /// Computes the minimum element in the stream as an [`Optional`], which
1335 /// will be empty until the first element in the input arrives.
1336 ///
1337 /// # Example
1338 /// ```rust
1339 /// # #[cfg(feature = "deploy")] {
1340 /// # use hydro_lang::prelude::*;
1341 /// # use futures::StreamExt;
1342 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1343 /// let tick = process.tick();
1344 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1345 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1346 /// batch.min().all_ticks()
1347 /// # }, |mut stream| async move {
1348 /// // 1
1349 /// # assert_eq!(stream.next().await.unwrap(), 1);
1350 /// # }));
1351 /// # }
1352 /// ```
1353 pub fn min(self) -> Optional<T, L, B>
1354 where
1355 T: Ord,
1356 {
1357 self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1358 .assume_ordering_trusted_bounded::<TotalOrder>(
1359 nondet!(/** max is commutative, but order affects intermediates */),
1360 )
1361 .reduce(q!(|curr, new| {
1362 if new < *curr {
1363 *curr = new;
1364 }
1365 }))
1366 }
1367
1368 /// Computes the first element in the stream as an [`Optional`], which
1369 /// will be empty until the first element in the input arrives.
1370 ///
1371 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1372 /// re-ordering of elements may cause the first element to change.
1373 ///
1374 /// # Example
1375 /// ```rust
1376 /// # #[cfg(feature = "deploy")] {
1377 /// # use hydro_lang::prelude::*;
1378 /// # use futures::StreamExt;
1379 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1380 /// let tick = process.tick();
1381 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1382 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1383 /// batch.first().all_ticks()
1384 /// # }, |mut stream| async move {
1385 /// // 1
1386 /// # assert_eq!(stream.next().await.unwrap(), 1);
1387 /// # }));
1388 /// # }
1389 /// ```
1390 pub fn first(self) -> Optional<T, L, B>
1391 where
1392 O: IsOrdered,
1393 {
1394 self.make_totally_ordered()
1395 .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1396 .generator(q!(|| ()), q!(|_, item| Generate::Return(item)))
1397 .reduce(q!(|_, _| {}))
1398 }
1399
1400 /// Computes the last element in the stream as an [`Optional`], which
1401 /// will be empty until an element in the input arrives.
1402 ///
1403 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1404 /// re-ordering of elements may cause the last element to change.
1405 ///
1406 /// # Example
1407 /// ```rust
1408 /// # #[cfg(feature = "deploy")] {
1409 /// # use hydro_lang::prelude::*;
1410 /// # use futures::StreamExt;
1411 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1412 /// let tick = process.tick();
1413 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1414 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1415 /// batch.last().all_ticks()
1416 /// # }, |mut stream| async move {
1417 /// // 4
1418 /// # assert_eq!(stream.next().await.unwrap(), 4);
1419 /// # }));
1420 /// # }
1421 /// ```
1422 pub fn last(self) -> Optional<T, L, B>
1423 where
1424 O: IsOrdered,
1425 {
1426 self.make_totally_ordered()
1427 .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1428 .reduce(q!(|curr, new| *curr = new))
1429 }
1430
1431 /// Returns a stream containing at most the first `n` elements of the input stream,
1432 /// preserving the original order. Similar to `LIMIT` in SQL.
1433 ///
1434 /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1435 /// retries, since the result depends on the order and cardinality of elements.
1436 ///
1437 /// # Example
1438 /// ```rust
1439 /// # #[cfg(feature = "deploy")] {
1440 /// # use hydro_lang::prelude::*;
1441 /// # use futures::StreamExt;
1442 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1443 /// let numbers = process.source_iter(q!(vec![10, 20, 30, 40, 50]));
1444 /// numbers.limit(q!(3))
1445 /// # }, |mut stream| async move {
1446 /// // 10, 20, 30
1447 /// # for w in vec![10, 20, 30] {
1448 /// # assert_eq!(stream.next().await.unwrap(), w);
1449 /// # }
1450 /// # }));
1451 /// # }
1452 /// ```
1453 pub fn limit(
1454 self,
1455 n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1456 ) -> Stream<T, L, B, TotalOrder, ExactlyOnce>
1457 where
1458 O: IsOrdered,
1459 R: IsExactlyOnce,
1460 {
1461 self.generator(
1462 q!(|| 0usize),
1463 q!(move |count, item| {
1464 if *count == n {
1465 Generate::Break
1466 } else {
1467 *count += 1;
1468 if *count == n {
1469 Generate::Return(item)
1470 } else {
1471 Generate::Yield(item)
1472 }
1473 }
1474 }),
1475 )
1476 }
1477
1478 /// Collects all the elements of this stream into a single [`Vec`] element.
1479 ///
1480 /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1481 /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1482 /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1483 /// the vector at an arbitrary point in time.
1484 ///
1485 /// # Example
1486 /// ```rust
1487 /// # #[cfg(feature = "deploy")] {
1488 /// # use hydro_lang::prelude::*;
1489 /// # use futures::StreamExt;
1490 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1491 /// let tick = process.tick();
1492 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1493 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1494 /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1495 /// # }, |mut stream| async move {
1496 /// // [ vec![1, 2, 3, 4] ]
1497 /// # for w in vec![vec![1, 2, 3, 4]] {
1498 /// # assert_eq!(stream.next().await.unwrap(), w);
1499 /// # }
1500 /// # }));
1501 /// # }
1502 /// ```
1503 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1504 where
1505 O: IsOrdered,
1506 R: IsExactlyOnce,
1507 {
1508 self.make_totally_ordered().make_exactly_once().fold(
1509 q!(|| vec![]),
1510 q!(|acc, v| {
1511 acc.push(v);
1512 }),
1513 )
1514 }
1515
1516 /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1517 /// and emitting each intermediate result.
1518 ///
1519 /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1520 /// containing all intermediate accumulated values. The scan operation can also terminate early
1521 /// by returning `None`.
1522 ///
1523 /// The function takes a mutable reference to the accumulator and the current element, and returns
1524 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1525 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1526 ///
1527 /// # Examples
1528 ///
1529 /// Basic usage - running sum:
1530 /// ```rust
1531 /// # #[cfg(feature = "deploy")] {
1532 /// # use hydro_lang::prelude::*;
1533 /// # use futures::StreamExt;
1534 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1535 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1536 /// q!(|| 0),
1537 /// q!(|acc, x| {
1538 /// *acc += x;
1539 /// Some(*acc)
1540 /// }),
1541 /// )
1542 /// # }, |mut stream| async move {
1543 /// // Output: 1, 3, 6, 10
1544 /// # for w in vec![1, 3, 6, 10] {
1545 /// # assert_eq!(stream.next().await.unwrap(), w);
1546 /// # }
1547 /// # }));
1548 /// # }
1549 /// ```
1550 ///
1551 /// Early termination example:
1552 /// ```rust
1553 /// # #[cfg(feature = "deploy")] {
1554 /// # use hydro_lang::prelude::*;
1555 /// # use futures::StreamExt;
1556 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1557 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1558 /// q!(|| 1),
1559 /// q!(|state, x| {
1560 /// *state = *state * x;
1561 /// if *state > 6 {
1562 /// None // Terminate the stream
1563 /// } else {
1564 /// Some(-*state)
1565 /// }
1566 /// }),
1567 /// )
1568 /// # }, |mut stream| async move {
1569 /// // Output: -1, -2, -6
1570 /// # for w in vec![-1, -2, -6] {
1571 /// # assert_eq!(stream.next().await.unwrap(), w);
1572 /// # }
1573 /// # }));
1574 /// # }
1575 /// ```
1576 pub fn scan<A, U, I, F>(
1577 self,
1578 init: impl IntoQuotedMut<'a, I, L>,
1579 f: impl IntoQuotedMut<'a, F, L>,
1580 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1581 where
1582 O: IsOrdered,
1583 R: IsExactlyOnce,
1584 I: Fn() -> A + 'a,
1585 F: Fn(&mut A, T) -> Option<U> + 'a,
1586 {
1587 let init = init.splice_fn0_ctx(&self.location).into();
1588 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1589
1590 Stream::new(
1591 self.location.clone(),
1592 HydroNode::Scan {
1593 init,
1594 acc: f,
1595 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1596 metadata: self.location.new_node_metadata(
1597 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1598 ),
1599 },
1600 )
1601 }
1602
1603 /// Async version of [`Stream::scan`]. Applies an async function to each element of the
1604 /// stream, maintaining an internal state (accumulator) and emitting the values returned
1605 /// by the function.
1606 ///
1607 /// The closure runs synchronously (so it can mutate the accumulator), then returns a
1608 /// future. The future is polled to completion. If it resolves to `Some`, the value is
1609 /// emitted. If it resolves to `None`, the item is filtered out.
1610 ///
1611 /// # Examples
1612 ///
1613 /// ```rust
1614 /// # #[cfg(feature = "deploy")] {
1615 /// # use hydro_lang::prelude::*;
1616 /// # use futures::StreamExt;
1617 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1618 /// process
1619 /// .source_iter(q!(vec![1, 2, 3, 4]))
1620 /// .scan_async_blocking(
1621 /// q!(|| 0),
1622 /// q!(|acc, x| {
1623 /// *acc += x;
1624 /// let val = *acc;
1625 /// async move { Some(val) }
1626 /// }),
1627 /// )
1628 /// # }, |mut stream| async move {
1629 /// // Output: 1, 3, 6, 10
1630 /// # for w in vec![1, 3, 6, 10] {
1631 /// # assert_eq!(stream.next().await.unwrap(), w);
1632 /// # }
1633 /// # }));
1634 /// # }
1635 /// ```
1636 pub fn scan_async_blocking<A, U, I, F, Fut>(
1637 self,
1638 init: impl IntoQuotedMut<'a, I, L>,
1639 f: impl IntoQuotedMut<'a, F, L>,
1640 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1641 where
1642 O: IsOrdered,
1643 R: IsExactlyOnce,
1644 I: Fn() -> A + 'a,
1645 F: Fn(&mut A, T) -> Fut + 'a,
1646 Fut: Future<Output = Option<U>> + 'a,
1647 {
1648 let init = init.splice_fn0_ctx(&self.location).into();
1649 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1650
1651 Stream::new(
1652 self.location.clone(),
1653 HydroNode::ScanAsyncBlocking {
1654 init,
1655 acc: f,
1656 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1657 metadata: self.location.new_node_metadata(
1658 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1659 ),
1660 },
1661 )
1662 }
1663
1664 /// Iteratively processes the elements of the stream using a state machine that can yield
1665 /// elements as it processes its inputs. This is designed to mirror the unstable generator
1666 /// syntax in Rust, without requiring special syntax.
1667 ///
1668 /// Like [`Stream::scan`], this function takes in an initializer that emits the initial
1669 /// state. The second argument defines the processing logic, taking in a mutable reference
1670 /// to the state and the value to be processed. It emits a [`Generate`] value, whose
1671 /// variants define what is emitted and whether further inputs should be processed.
1672 ///
1673 /// # Example
1674 /// ```rust
1675 /// # #[cfg(feature = "deploy")] {
1676 /// # use hydro_lang::prelude::*;
1677 /// # use futures::StreamExt;
1678 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1679 /// process.source_iter(q!(vec![1, 3, 100, 10])).generator(
1680 /// q!(|| 0),
1681 /// q!(|acc, x| {
1682 /// *acc += x;
1683 /// if *acc > 100 {
1684 /// hydro_lang::live_collections::keyed_stream::Generate::Return("done!".to_owned())
1685 /// } else if *acc % 2 == 0 {
1686 /// hydro_lang::live_collections::keyed_stream::Generate::Yield("even".to_owned())
1687 /// } else {
1688 /// hydro_lang::live_collections::keyed_stream::Generate::Continue
1689 /// }
1690 /// }),
1691 /// )
1692 /// # }, |mut stream| async move {
1693 /// // Output: "even", "done!"
1694 /// # let mut results = Vec::new();
1695 /// # for _ in 0..2 {
1696 /// # results.push(stream.next().await.unwrap());
1697 /// # }
1698 /// # results.sort();
1699 /// # assert_eq!(results, vec!["done!".to_owned(), "even".to_owned()]);
1700 /// # }));
1701 /// # }
1702 /// ```
1703 pub fn generator<A, U, I, F>(
1704 self,
1705 init: impl IntoQuotedMut<'a, I, L> + Copy,
1706 f: impl IntoQuotedMut<'a, F, L> + Copy,
1707 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1708 where
1709 O: IsOrdered,
1710 R: IsExactlyOnce,
1711 I: Fn() -> A + 'a,
1712 F: Fn(&mut A, T) -> Generate<U> + 'a,
1713 {
1714 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1715 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1716
1717 let this = self.make_totally_ordered().make_exactly_once();
1718
1719 // State is Option<Option<A>>:
1720 // None = not yet initialized
1721 // Some(Some(a)) = active with state a
1722 // Some(None) = terminated
1723 let scan_init = q!(|| None)
1724 .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1725 .into();
1726 let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1727 if state.is_none() {
1728 *state = Some(Some(init()));
1729 }
1730 match state {
1731 Some(Some(state_value)) => match f(state_value, v) {
1732 Generate::Yield(out) => Some(Some(out)),
1733 Generate::Return(out) => {
1734 *state = Some(None);
1735 Some(Some(out))
1736 }
1737 // Unlike KeyedStream, we can terminate the scan directly on
1738 // Break/Return because there is only one state (no other keys
1739 // that still need processing).
1740 Generate::Break => None,
1741 Generate::Continue => Some(None),
1742 },
1743 // State is Some(None) after Return; terminate the scan.
1744 _ => None,
1745 }
1746 })
1747 .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1748 .into();
1749
1750 let scan_node = HydroNode::Scan {
1751 init: scan_init,
1752 acc: scan_f,
1753 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1754 metadata: this.location.new_node_metadata(Stream::<
1755 Option<U>,
1756 L,
1757 B,
1758 TotalOrder,
1759 ExactlyOnce,
1760 >::collection_kind()),
1761 };
1762
1763 let flatten_f = q!(|d| d)
1764 .splice_fn1_ctx::<Option<U>, _>(&this.location)
1765 .into();
1766 let flatten_node = HydroNode::FlatMap {
1767 f: flatten_f,
1768 input: Box::new(scan_node),
1769 metadata: this
1770 .location
1771 .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1772 };
1773
1774 Stream::new(this.location.clone(), flatten_node)
1775 }
1776
1777 /// Given a time interval, returns a stream corresponding to samples taken from the
1778 /// stream roughly at that interval. The output will have elements in the same order
1779 /// as the input, but with arbitrary elements skipped between samples. There is also
1780 /// no guarantee on the exact timing of the samples.
1781 ///
1782 /// # Non-Determinism
1783 /// The output stream is non-deterministic in which elements are sampled, since this
1784 /// is controlled by a clock.
1785 pub fn sample_every(
1786 self,
1787 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1788 nondet: NonDet,
1789 ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
1790 where
1791 L: NoTick + NoAtomic,
1792 {
1793 let samples = self.location.source_interval(interval, nondet);
1794
1795 let tick = self.location.tick();
1796 self.batch(&tick, nondet)
1797 .filter_if(samples.batch(&tick, nondet).first().is_some())
1798 .all_ticks()
1799 .weaken_retries()
1800 }
1801
1802 /// Given a timeout duration, returns an [`Optional`] which will have a value if the
1803 /// stream has not emitted a value since that duration.
1804 ///
1805 /// # Non-Determinism
1806 /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1807 /// samples take place, timeouts may be non-deterministically generated or missed,
1808 /// and the notification of the timeout may be delayed as well. There is also no
1809 /// guarantee on how long the [`Optional`] will have a value after the timeout is
1810 /// detected based on when the next sample is taken.
1811 pub fn timeout(
1812 self,
1813 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
1814 nondet: NonDet,
1815 ) -> Optional<(), L, Unbounded>
1816 where
1817 L: NoTick + NoAtomic,
1818 {
1819 let tick = self.location.tick();
1820
1821 let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
1822 q!(|| None),
1823 q!(
1824 |latest, _| {
1825 *latest = Some(Instant::now());
1826 },
1827 commutative = manual_proof!(/** TODO */)
1828 ),
1829 );
1830
1831 latest_received
1832 .snapshot(&tick, nondet)
1833 .filter_map(q!(move |latest_received| {
1834 if let Some(latest_received) = latest_received {
1835 if Instant::now().duration_since(latest_received) > duration {
1836 Some(())
1837 } else {
1838 None
1839 }
1840 } else {
1841 Some(())
1842 }
1843 }))
1844 .latest()
1845 }
1846
1847 /// Shifts this stream into an atomic context, which guarantees that any downstream logic
1848 /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
1849 ///
1850 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1851 /// processed before an acknowledgement is emitted.
1852 pub fn atomic(self) -> Stream<T, Atomic<L>, B, O, R> {
1853 let id = self.location.flow_state().borrow_mut().next_clock_id();
1854 let out_location = Atomic {
1855 tick: Tick {
1856 id,
1857 l: self.location.clone(),
1858 },
1859 };
1860 Stream::new(
1861 out_location.clone(),
1862 HydroNode::BeginAtomic {
1863 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1864 metadata: out_location
1865 .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
1866 },
1867 )
1868 }
1869
1870 /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1871 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1872 /// the order of the input. The output stream will execute in the [`Tick`] that was
1873 /// used to create the atomic section.
1874 ///
1875 /// # Non-Determinism
1876 /// The batch boundaries are non-deterministic and may change across executions.
1877 pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
1878 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1879 Stream::new(
1880 tick.clone(),
1881 HydroNode::Batch {
1882 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1883 metadata: tick
1884 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
1885 },
1886 )
1887 }
1888
1889 /// An operator which allows you to "name" a `HydroNode`.
1890 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1891 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
1892 {
1893 let mut node = self.ir_node.borrow_mut();
1894 let metadata = node.metadata_mut();
1895 metadata.tag = Some(name.to_owned());
1896 }
1897 self
1898 }
1899
1900 /// Turns this [`Stream`] into a [`Optional`], under the invariant assumption that there is at
1901 /// most one element. If this invariant is broken, the program may exhibit undefined behavior,
1902 /// so uses must be carefully vetted.
1903 pub(crate) fn cast_at_most_one_element(self) -> Optional<T, L, B>
1904 where
1905 B: IsBounded,
1906 {
1907 Optional::new(
1908 self.location.clone(),
1909 HydroNode::Cast {
1910 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1911 metadata: self
1912 .location
1913 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1914 },
1915 )
1916 }
1917
1918 pub(crate) fn use_ordering_type<O2: Ordering>(self) -> Stream<T, L, B, O2, R> {
1919 if O::ORDERING_KIND == O2::ORDERING_KIND {
1920 Stream::new(
1921 self.location.clone(),
1922 self.ir_node.replace(HydroNode::Placeholder),
1923 )
1924 } else {
1925 panic!(
1926 "Runtime ordering {:?} did not match requested cast {:?}.",
1927 O::ORDERING_KIND,
1928 O2::ORDERING_KIND
1929 )
1930 }
1931 }
1932
1933 /// Explicitly "casts" the stream to a type with a different ordering
1934 /// guarantee. Useful in unsafe code where the ordering cannot be proven
1935 /// by the type-system.
1936 ///
1937 /// # Non-Determinism
1938 /// This function is used as an escape hatch, and any mistakes in the
1939 /// provided ordering guarantee will propagate into the guarantees
1940 /// for the rest of the program.
1941 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
1942 if O::ORDERING_KIND == O2::ORDERING_KIND {
1943 self.use_ordering_type()
1944 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1945 // We can always weaken the ordering guarantee
1946 Stream::new(
1947 self.location.clone(),
1948 HydroNode::Cast {
1949 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1950 metadata: self
1951 .location
1952 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1953 },
1954 )
1955 } else {
1956 Stream::new(
1957 self.location.clone(),
1958 HydroNode::ObserveNonDet {
1959 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1960 trusted: false,
1961 metadata: self
1962 .location
1963 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1964 },
1965 )
1966 }
1967 }
1968
1969 // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
1970 // intermediate states will not be revealed
1971 fn assume_ordering_trusted_bounded<O2: Ordering>(
1972 self,
1973 nondet: NonDet,
1974 ) -> Stream<T, L, B, O2, R> {
1975 if B::BOUNDED {
1976 self.assume_ordering_trusted(nondet)
1977 } else {
1978 self.assume_ordering(nondet)
1979 }
1980 }
1981
1982 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1983 // is not observable
1984 pub(crate) fn assume_ordering_trusted<O2: Ordering>(
1985 self,
1986 _nondet: NonDet,
1987 ) -> Stream<T, L, B, O2, R> {
1988 if O::ORDERING_KIND == O2::ORDERING_KIND {
1989 Stream::new(
1990 self.location.clone(),
1991 self.ir_node.replace(HydroNode::Placeholder),
1992 )
1993 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1994 // We can always weaken the ordering guarantee
1995 Stream::new(
1996 self.location.clone(),
1997 HydroNode::Cast {
1998 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1999 metadata: self
2000 .location
2001 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2002 },
2003 )
2004 } else {
2005 Stream::new(
2006 self.location.clone(),
2007 HydroNode::ObserveNonDet {
2008 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2009 trusted: true,
2010 metadata: self
2011 .location
2012 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2013 },
2014 )
2015 }
2016 }
2017
2018 #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
2019 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
2020 /// which is always safe because that is the weakest possible guarantee.
2021 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
2022 self.weaken_ordering::<NoOrder>()
2023 }
2024
2025 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
2026 /// enforcing that `O2` is weaker than the input ordering guarantee.
2027 pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
2028 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
2029 self.assume_ordering::<O2>(nondet)
2030 }
2031
2032 /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
2033 /// implies that `O == TotalOrder`.
2034 pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
2035 where
2036 O: IsOrdered,
2037 {
2038 self.assume_ordering(nondet!(/** no-op */))
2039 }
2040
2041 /// Explicitly "casts" the stream to a type with a different retries
2042 /// guarantee. Useful in unsafe code where the lack of retries cannot
2043 /// be proven by the type-system.
2044 ///
2045 /// # Non-Determinism
2046 /// This function is used as an escape hatch, and any mistakes in the
2047 /// provided retries guarantee will propagate into the guarantees
2048 /// for the rest of the program.
2049 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
2050 if R::RETRIES_KIND == R2::RETRIES_KIND {
2051 Stream::new(
2052 self.location.clone(),
2053 self.ir_node.replace(HydroNode::Placeholder),
2054 )
2055 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2056 // We can always weaken the retries guarantee
2057 Stream::new(
2058 self.location.clone(),
2059 HydroNode::Cast {
2060 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2061 metadata: self
2062 .location
2063 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2064 },
2065 )
2066 } else {
2067 Stream::new(
2068 self.location.clone(),
2069 HydroNode::ObserveNonDet {
2070 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2071 trusted: false,
2072 metadata: self
2073 .location
2074 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2075 },
2076 )
2077 }
2078 }
2079
2080 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
2081 // is not observable
2082 fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
2083 if R::RETRIES_KIND == R2::RETRIES_KIND {
2084 Stream::new(
2085 self.location.clone(),
2086 self.ir_node.replace(HydroNode::Placeholder),
2087 )
2088 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2089 // We can always weaken the retries guarantee
2090 Stream::new(
2091 self.location.clone(),
2092 HydroNode::Cast {
2093 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2094 metadata: self
2095 .location
2096 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2097 },
2098 )
2099 } else {
2100 Stream::new(
2101 self.location.clone(),
2102 HydroNode::ObserveNonDet {
2103 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2104 trusted: true,
2105 metadata: self
2106 .location
2107 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2108 },
2109 )
2110 }
2111 }
2112
2113 #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
2114 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
2115 /// which is always safe because that is the weakest possible guarantee.
2116 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
2117 self.weaken_retries::<AtLeastOnce>()
2118 }
2119
2120 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
2121 /// enforcing that `R2` is weaker than the input retries guarantee.
2122 pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
2123 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
2124 self.assume_retries::<R2>(nondet)
2125 }
2126
2127 /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
2128 /// implies that `R == ExactlyOnce`.
2129 pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
2130 where
2131 R: IsExactlyOnce,
2132 {
2133 self.assume_retries(nondet!(/** no-op */))
2134 }
2135
2136 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
2137 /// implies that `B == Bounded`.
2138 pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
2139 where
2140 B: IsBounded,
2141 {
2142 self.weaken_boundedness()
2143 }
2144
2145 /// Weakens the boundedness guarantee to an arbitrary boundedness `B2`, given that `B: IsBounded`,
2146 /// which implies that `B == Bounded`.
2147 pub fn weaken_boundedness<B2: Boundedness>(self) -> Stream<T, L, B2, O, R> {
2148 if B::BOUNDED == B2::BOUNDED {
2149 Stream::new(
2150 self.location.clone(),
2151 self.ir_node.replace(HydroNode::Placeholder),
2152 )
2153 } else {
2154 // We can always weaken the boundedness
2155 Stream::new(
2156 self.location.clone(),
2157 HydroNode::Cast {
2158 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2159 metadata: self
2160 .location
2161 .new_node_metadata(Stream::<T, L, B2, O, R>::collection_kind()),
2162 },
2163 )
2164 }
2165 }
2166}
2167
2168impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
2169where
2170 L: Location<'a>,
2171{
2172 /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
2173 ///
2174 /// # Example
2175 /// ```rust
2176 /// # #[cfg(feature = "deploy")] {
2177 /// # use hydro_lang::prelude::*;
2178 /// # use futures::StreamExt;
2179 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2180 /// process.source_iter(q!(&[1, 2, 3])).cloned()
2181 /// # }, |mut stream| async move {
2182 /// // 1, 2, 3
2183 /// # for w in vec![1, 2, 3] {
2184 /// # assert_eq!(stream.next().await.unwrap(), w);
2185 /// # }
2186 /// # }));
2187 /// # }
2188 /// ```
2189 pub fn cloned(self) -> Stream<T, L, B, O, R>
2190 where
2191 T: Clone,
2192 {
2193 self.map(q!(|d| d.clone()))
2194 }
2195}
2196
2197impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2198where
2199 L: Location<'a>,
2200{
2201 /// Computes the number of elements in the stream as a [`Singleton`].
2202 ///
2203 /// # Example
2204 /// ```rust
2205 /// # #[cfg(feature = "deploy")] {
2206 /// # use hydro_lang::prelude::*;
2207 /// # use futures::StreamExt;
2208 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2209 /// let tick = process.tick();
2210 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2211 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2212 /// batch.count().all_ticks()
2213 /// # }, |mut stream| async move {
2214 /// // 4
2215 /// # assert_eq!(stream.next().await.unwrap(), 4);
2216 /// # }));
2217 /// # }
2218 /// ```
2219 pub fn count(self) -> Singleton<usize, L, B::StreamToMonotone> {
2220 self.assume_ordering_trusted::<TotalOrder>(nondet!(
2221 /// Order does not affect eventual count, and also does not affect intermediate states.
2222 ))
2223 .fold(
2224 q!(|| 0usize),
2225 q!(
2226 |count, _| *count += 1,
2227 monotone = manual_proof!(/** += 1 is monotone */)
2228 ),
2229 )
2230 }
2231}
2232
2233impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2234 /// Produces a new stream that merges the elements of the two input streams.
2235 /// The result has [`NoOrder`] because the order of merging is not guaranteed.
2236 ///
2237 /// Currently, both input streams must be [`Unbounded`]. When the streams are
2238 /// [`Bounded`], you can use [`Stream::chain`] instead.
2239 ///
2240 /// # Example
2241 /// ```rust
2242 /// # #[cfg(feature = "deploy")] {
2243 /// # use hydro_lang::prelude::*;
2244 /// # use futures::StreamExt;
2245 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2246 /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
2247 /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
2248 /// numbers.clone().map(q!(|x| x + 1)).merge_unordered(numbers)
2249 /// # }, |mut stream| async move {
2250 /// // 2, 3, 4, 5, and 1, 2, 3, 4 merged in unknown order
2251 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2252 /// # assert_eq!(stream.next().await.unwrap(), w);
2253 /// # }
2254 /// # }));
2255 /// # }
2256 /// ```
2257 pub fn merge_unordered<O2: Ordering, R2: Retries>(
2258 self,
2259 other: Stream<T, L, Unbounded, O2, R2>,
2260 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2261 where
2262 R: MinRetries<R2>,
2263 {
2264 Stream::new(
2265 self.location.clone(),
2266 HydroNode::Chain {
2267 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2268 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2269 metadata: self.location.new_node_metadata(Stream::<
2270 T,
2271 L,
2272 Unbounded,
2273 NoOrder,
2274 <R as MinRetries<R2>>::Min,
2275 >::collection_kind()),
2276 },
2277 )
2278 }
2279
2280 /// Deprecated: use [`Stream::merge_unordered`] instead.
2281 #[deprecated(note = "use `merge_unordered` instead")]
2282 pub fn interleave<O2: Ordering, R2: Retries>(
2283 self,
2284 other: Stream<T, L, Unbounded, O2, R2>,
2285 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2286 where
2287 R: MinRetries<R2>,
2288 {
2289 self.merge_unordered(other)
2290 }
2291}
2292
2293impl<'a, T, L: Location<'a>, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R> {
2294 /// Produces a new stream that combines the elements of the two input streams,
2295 /// preserving the relative order of elements within each input.
2296 ///
2297 /// # Non-Determinism
2298 /// The order in which elements *across* the two streams will be interleaved is
2299 /// non-deterministic, so the order of elements will vary across runs. If the output
2300 /// order is irrelevant, use [`Stream::merge_unordered`] instead, which is deterministic
2301 /// but emits an unordered stream. For deterministic first-then-second ordering on
2302 /// bounded streams, use [`Stream::chain`].
2303 ///
2304 /// # Example
2305 /// ```rust
2306 /// # #[cfg(feature = "deploy")] {
2307 /// # use hydro_lang::prelude::*;
2308 /// # use futures::StreamExt;
2309 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2310 /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
2311 /// # process.source_iter(q!(vec![1, 3])).into();
2312 /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
2313 /// # }, |mut stream| async move {
2314 /// // 1, 3 and 2, 4 in some order, preserving the original local order
2315 /// # for w in vec![1, 3, 2, 4] {
2316 /// # assert_eq!(stream.next().await.unwrap(), w);
2317 /// # }
2318 /// # }));
2319 /// # }
2320 /// ```
2321 pub fn merge_ordered<R2: Retries>(
2322 self,
2323 other: Stream<T, L, B, TotalOrder, R2>,
2324 _nondet: NonDet,
2325 ) -> Stream<T, L, B, TotalOrder, <R as MinRetries<R2>>::Min>
2326 where
2327 R: MinRetries<R2>,
2328 {
2329 Stream::new(
2330 self.location.clone(),
2331 HydroNode::MergeOrdered {
2332 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2333 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2334 metadata: self.location.new_node_metadata(Stream::<
2335 T,
2336 L,
2337 B,
2338 TotalOrder,
2339 <R as MinRetries<R2>>::Min,
2340 >::collection_kind()),
2341 },
2342 )
2343 }
2344}
2345
2346impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2347where
2348 L: Location<'a>,
2349{
2350 /// Produces a new stream that emits the input elements in sorted order.
2351 ///
2352 /// The input stream can have any ordering guarantee, but the output stream
2353 /// will have a [`TotalOrder`] guarantee. This operator will block until all
2354 /// elements in the input stream are available, so it requires the input stream
2355 /// to be [`Bounded`].
2356 ///
2357 /// # Example
2358 /// ```rust
2359 /// # #[cfg(feature = "deploy")] {
2360 /// # use hydro_lang::prelude::*;
2361 /// # use futures::StreamExt;
2362 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2363 /// let tick = process.tick();
2364 /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
2365 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2366 /// batch.sort().all_ticks()
2367 /// # }, |mut stream| async move {
2368 /// // 1, 2, 3, 4
2369 /// # for w in (1..5) {
2370 /// # assert_eq!(stream.next().await.unwrap(), w);
2371 /// # }
2372 /// # }));
2373 /// # }
2374 /// ```
2375 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2376 where
2377 B: IsBounded,
2378 T: Ord,
2379 {
2380 let this = self.make_bounded();
2381 Stream::new(
2382 this.location.clone(),
2383 HydroNode::Sort {
2384 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2385 metadata: this
2386 .location
2387 .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2388 },
2389 )
2390 }
2391
2392 /// Produces a new stream that first emits the elements of the `self` stream,
2393 /// and then emits the elements of the `other` stream. The output stream has
2394 /// a [`TotalOrder`] guarantee if and only if both input streams have a
2395 /// [`TotalOrder`] guarantee.
2396 ///
2397 /// Currently, both input streams must be [`Bounded`]. This operator will block
2398 /// on the first stream until all its elements are available. In a future version,
2399 /// we will relax the requirement on the `other` stream.
2400 ///
2401 /// # Example
2402 /// ```rust
2403 /// # #[cfg(feature = "deploy")] {
2404 /// # use hydro_lang::prelude::*;
2405 /// # use futures::StreamExt;
2406 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2407 /// let tick = process.tick();
2408 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2409 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2410 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2411 /// # }, |mut stream| async move {
2412 /// // 2, 3, 4, 5, 1, 2, 3, 4
2413 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2414 /// # assert_eq!(stream.next().await.unwrap(), w);
2415 /// # }
2416 /// # }));
2417 /// # }
2418 /// ```
2419 pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2420 self,
2421 other: Stream<T, L, B2, O2, R2>,
2422 ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2423 where
2424 B: IsBounded,
2425 O: MinOrder<O2>,
2426 R: MinRetries<R2>,
2427 {
2428 check_matching_location(&self.location, &other.location);
2429
2430 Stream::new(
2431 self.location.clone(),
2432 HydroNode::Chain {
2433 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2434 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2435 metadata: self.location.new_node_metadata(Stream::<
2436 T,
2437 L,
2438 B2,
2439 <O as MinOrder<O2>>::Min,
2440 <R as MinRetries<R2>>::Min,
2441 >::collection_kind()),
2442 },
2443 )
2444 }
2445
2446 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
2447 /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
2448 /// because this is compiled into a nested loop.
2449 pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
2450 self,
2451 other: Stream<T2, L, Bounded, O2, R>,
2452 ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
2453 where
2454 B: IsBounded,
2455 T: Clone,
2456 T2: Clone,
2457 {
2458 let this = self.make_bounded();
2459 check_matching_location(&this.location, &other.location);
2460
2461 Stream::new(
2462 this.location.clone(),
2463 HydroNode::CrossProduct {
2464 left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2465 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2466 metadata: this.location.new_node_metadata(Stream::<
2467 (T, T2),
2468 L,
2469 Bounded,
2470 <O2 as MinOrder<O>>::Min,
2471 R,
2472 >::collection_kind()),
2473 },
2474 )
2475 }
2476
2477 /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2478 /// `self` used as the values for *each* key.
2479 ///
2480 /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2481 /// values. For example, it can be used to send the same set of elements to several cluster
2482 /// members, if the membership information is available as a [`KeyedSingleton`].
2483 ///
2484 /// # Example
2485 /// ```rust
2486 /// # #[cfg(feature = "deploy")] {
2487 /// # use hydro_lang::prelude::*;
2488 /// # use futures::StreamExt;
2489 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2490 /// # let tick = process.tick();
2491 /// let keyed_singleton = // { 1: (), 2: () }
2492 /// # process
2493 /// # .source_iter(q!(vec![(1, ()), (2, ())]))
2494 /// # .into_keyed()
2495 /// # .batch(&tick, nondet!(/** test */))
2496 /// # .first();
2497 /// let stream = // [ "a", "b" ]
2498 /// # process
2499 /// # .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2500 /// # .batch(&tick, nondet!(/** test */));
2501 /// stream.repeat_with_keys(keyed_singleton)
2502 /// # .entries().all_ticks()
2503 /// # }, |mut stream| async move {
2504 /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2505 /// # let mut results = Vec::new();
2506 /// # for _ in 0..4 {
2507 /// # results.push(stream.next().await.unwrap());
2508 /// # }
2509 /// # results.sort();
2510 /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2511 /// # }));
2512 /// # }
2513 /// ```
2514 pub fn repeat_with_keys<K, V2>(
2515 self,
2516 keys: KeyedSingleton<K, V2, L, Bounded>,
2517 ) -> KeyedStream<K, T, L, Bounded, O, R>
2518 where
2519 B: IsBounded,
2520 K: Clone,
2521 T: Clone,
2522 {
2523 keys.keys()
2524 .weaken_retries()
2525 .assume_ordering_trusted::<TotalOrder>(
2526 nondet!(/** keyed stream does not depend on ordering of keys */),
2527 )
2528 .cross_product_nested_loop(self.make_bounded())
2529 .into_keyed()
2530 }
2531
2532 /// Consumes a stream of `Future<T>`, resolving each future while blocking subgraph
2533 /// execution until all results are available. The output order is based on when futures
2534 /// complete, and may be different than the input order.
2535 ///
2536 /// Unlike [`Stream::resolve_futures`], which allows the subgraph to continue executing
2537 /// while futures are pending, this variant blocks until the futures resolve.
2538 ///
2539 /// # Example
2540 /// ```rust
2541 /// # #[cfg(feature = "deploy")] {
2542 /// # use std::collections::HashSet;
2543 /// # use futures::StreamExt;
2544 /// # use hydro_lang::prelude::*;
2545 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2546 /// process
2547 /// .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2548 /// .map(q!(|x| async move {
2549 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2550 /// x
2551 /// }))
2552 /// .resolve_futures_blocking()
2553 /// # },
2554 /// # |mut stream| async move {
2555 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2556 /// # let mut output = HashSet::new();
2557 /// # for _ in 1..10 {
2558 /// # output.insert(stream.next().await.unwrap());
2559 /// # }
2560 /// # assert_eq!(
2561 /// # output,
2562 /// # HashSet::<i32>::from_iter(1..10)
2563 /// # );
2564 /// # },
2565 /// # ));
2566 /// # }
2567 /// ```
2568 pub fn resolve_futures_blocking(self) -> Stream<T::Output, L, B, NoOrder, R>
2569 where
2570 T: Future,
2571 {
2572 Stream::new(
2573 self.location.clone(),
2574 HydroNode::ResolveFuturesBlocking {
2575 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2576 metadata: self
2577 .location
2578 .new_node_metadata(Stream::<T::Output, L, B, NoOrder, R>::collection_kind()),
2579 },
2580 )
2581 }
2582
2583 /// Returns a [`Singleton`] containing `true` if the stream has no elements, or `false` otherwise.
2584 ///
2585 /// # Example
2586 /// ```rust
2587 /// # #[cfg(feature = "deploy")] {
2588 /// # use hydro_lang::prelude::*;
2589 /// # use futures::StreamExt;
2590 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2591 /// let tick = process.tick();
2592 /// let empty: Stream<i32, _, Bounded> = process
2593 /// .source_iter(q!(Vec::<i32>::new()))
2594 /// .batch(&tick, nondet!(/** test */));
2595 /// empty.is_empty().all_ticks()
2596 /// # }, |mut stream| async move {
2597 /// // true
2598 /// # assert_eq!(stream.next().await.unwrap(), true);
2599 /// # }));
2600 /// # }
2601 /// ```
2602 #[expect(clippy::wrong_self_convention, reason = "stream function naming")]
2603 pub fn is_empty(self) -> Singleton<bool, L, Bounded>
2604 where
2605 B: IsBounded,
2606 {
2607 self.make_bounded()
2608 .assume_ordering_trusted::<TotalOrder>(
2609 nondet!(/** is_empty intermediates unaffected by order */),
2610 )
2611 .first()
2612 .is_none()
2613 }
2614}
2615
2616impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2617where
2618 L: Location<'a>,
2619{
2620 #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2621 /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2622 /// by equi-joining the two streams on the key attribute `K`.
2623 ///
2624 /// When the right-hand side is [`Bounded`], the join accumulates the right side first
2625 /// and streams the left side through, preserving the left side's ordering. When both
2626 /// sides are [`Unbounded`], a symmetric hash join is used and ordering is [`NoOrder`].
2627 ///
2628 /// # Example
2629 /// ```rust
2630 /// # #[cfg(feature = "deploy")] {
2631 /// # use hydro_lang::prelude::*;
2632 /// # use std::collections::HashSet;
2633 /// # use futures::StreamExt;
2634 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2635 /// let tick = process.tick();
2636 /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2637 /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2638 /// stream1.join(stream2)
2639 /// # }, |mut stream| async move {
2640 /// // (1, ('a', 'x')), (2, ('b', 'y'))
2641 /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2642 /// # stream.map(|i| assert!(expected.contains(&i)));
2643 /// # }));
2644 /// # }
2645 pub fn join<V2, B2: Boundedness, O2: Ordering, R2: Retries>(
2646 self,
2647 n: Stream<(K, V2), L, B2, O2, R2>,
2648 ) -> Stream<(K, (V1, V2)), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
2649 where
2650 K: Eq + Hash + Clone,
2651 R: MinRetries<R2>,
2652 V1: Clone,
2653 V2: Clone,
2654 {
2655 check_matching_location(&self.location, &n.location);
2656
2657 let ir_node = if B2::BOUNDED {
2658 HydroNode::JoinHalf {
2659 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2660 right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2661 metadata: self.location.new_node_metadata(Stream::<
2662 (K, (V1, V2)),
2663 L,
2664 B,
2665 B2::PreserveOrderIfBounded<O>,
2666 <R as MinRetries<R2>>::Min,
2667 >::collection_kind()),
2668 }
2669 } else {
2670 HydroNode::Join {
2671 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2672 right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2673 metadata: self.location.new_node_metadata(Stream::<
2674 (K, (V1, V2)),
2675 L,
2676 B,
2677 B2::PreserveOrderIfBounded<O>,
2678 <R as MinRetries<R2>>::Min,
2679 >::collection_kind()),
2680 }
2681 };
2682
2683 Stream::new(self.location.clone(), ir_node)
2684 }
2685
2686 /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2687 /// computes the anti-join of the items in the input -- i.e. returns
2688 /// unique items in the first input that do not have a matching key
2689 /// in the second input.
2690 ///
2691 /// # Example
2692 /// ```rust
2693 /// # #[cfg(feature = "deploy")] {
2694 /// # use hydro_lang::prelude::*;
2695 /// # use futures::StreamExt;
2696 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2697 /// let tick = process.tick();
2698 /// let stream = process
2699 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2700 /// .batch(&tick, nondet!(/** test */));
2701 /// let batch = process
2702 /// .source_iter(q!(vec![1, 2]))
2703 /// .batch(&tick, nondet!(/** test */));
2704 /// stream.anti_join(batch).all_ticks()
2705 /// # }, |mut stream| async move {
2706 /// # for w in vec![(3, 'c'), (4, 'd')] {
2707 /// # assert_eq!(stream.next().await.unwrap(), w);
2708 /// # }
2709 /// # }));
2710 /// # }
2711 pub fn anti_join<O2: Ordering, R2: Retries>(
2712 self,
2713 n: Stream<K, L, Bounded, O2, R2>,
2714 ) -> Stream<(K, V1), L, B, O, R>
2715 where
2716 K: Eq + Hash,
2717 {
2718 check_matching_location(&self.location, &n.location);
2719
2720 Stream::new(
2721 self.location.clone(),
2722 HydroNode::AntiJoin {
2723 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2724 neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2725 metadata: self
2726 .location
2727 .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2728 },
2729 )
2730 }
2731}
2732
2733impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2734 Stream<(K, V), L, B, O, R>
2735{
2736 /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2737 /// is used as the key and the second element is added to the entries associated with that key.
2738 ///
2739 /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2740 /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2741 /// performing grouped aggregations, but also for more precise ordering guarantees such as
2742 /// total ordering _within_ each group but no ordering _across_ groups.
2743 ///
2744 /// # Example
2745 /// ```rust
2746 /// # #[cfg(feature = "deploy")] {
2747 /// # use hydro_lang::prelude::*;
2748 /// # use futures::StreamExt;
2749 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2750 /// process
2751 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2752 /// .into_keyed()
2753 /// # .entries()
2754 /// # }, |mut stream| async move {
2755 /// // { 1: [2, 3], 2: [4] }
2756 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2757 /// # assert_eq!(stream.next().await.unwrap(), w);
2758 /// # }
2759 /// # }));
2760 /// # }
2761 /// ```
2762 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2763 KeyedStream::new(
2764 self.location.clone(),
2765 HydroNode::Cast {
2766 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2767 metadata: self
2768 .location
2769 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2770 },
2771 )
2772 }
2773}
2774
2775impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2776where
2777 K: Eq + Hash,
2778 L: Location<'a>,
2779{
2780 /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2781 /// # Example
2782 /// ```rust
2783 /// # #[cfg(feature = "deploy")] {
2784 /// # use hydro_lang::prelude::*;
2785 /// # use futures::StreamExt;
2786 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2787 /// let tick = process.tick();
2788 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2789 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2790 /// batch.keys().all_ticks()
2791 /// # }, |mut stream| async move {
2792 /// // 1, 2
2793 /// # assert_eq!(stream.next().await.unwrap(), 1);
2794 /// # assert_eq!(stream.next().await.unwrap(), 2);
2795 /// # }));
2796 /// # }
2797 /// ```
2798 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2799 self.into_keyed()
2800 .fold(
2801 q!(|| ()),
2802 q!(
2803 |_, _| {},
2804 commutative = manual_proof!(/** values are ignored */),
2805 idempotent = manual_proof!(/** values are ignored */)
2806 ),
2807 )
2808 .keys()
2809 }
2810}
2811
2812impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2813where
2814 L: Location<'a> + NoTick,
2815{
2816 /// Returns a stream corresponding to the latest batch of elements being atomically
2817 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2818 /// the order of the input.
2819 ///
2820 /// # Non-Determinism
2821 /// The batch boundaries are non-deterministic and may change across executions.
2822 pub fn batch_atomic(
2823 self,
2824 tick: &Tick<L>,
2825 _nondet: NonDet,
2826 ) -> Stream<T, Tick<L>, Bounded, O, R> {
2827 Stream::new(
2828 tick.clone(),
2829 HydroNode::Batch {
2830 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2831 metadata: tick
2832 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2833 },
2834 )
2835 }
2836
2837 /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2838 /// See [`Stream::atomic`] for more details.
2839 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2840 Stream::new(
2841 self.location.tick.l.clone(),
2842 HydroNode::EndAtomic {
2843 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2844 metadata: self
2845 .location
2846 .tick
2847 .l
2848 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2849 },
2850 )
2851 }
2852}
2853
2854impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2855where
2856 L: Location<'a> + NoTick + NoAtomic,
2857 F: Future<Output = T>,
2858{
2859 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2860 /// Future outputs are produced as available, regardless of input arrival order.
2861 ///
2862 /// # Example
2863 /// ```rust
2864 /// # #[cfg(feature = "deploy")] {
2865 /// # use std::collections::HashSet;
2866 /// # use futures::StreamExt;
2867 /// # use hydro_lang::prelude::*;
2868 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2869 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2870 /// .map(q!(|x| async move {
2871 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2872 /// x
2873 /// }))
2874 /// .resolve_futures()
2875 /// # },
2876 /// # |mut stream| async move {
2877 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2878 /// # let mut output = HashSet::new();
2879 /// # for _ in 1..10 {
2880 /// # output.insert(stream.next().await.unwrap());
2881 /// # }
2882 /// # assert_eq!(
2883 /// # output,
2884 /// # HashSet::<i32>::from_iter(1..10)
2885 /// # );
2886 /// # },
2887 /// # ));
2888 /// # }
2889 pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
2890 Stream::new(
2891 self.location.clone(),
2892 HydroNode::ResolveFutures {
2893 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2894 metadata: self
2895 .location
2896 .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
2897 },
2898 )
2899 }
2900
2901 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2902 /// Future outputs are produced in the same order as the input stream.
2903 ///
2904 /// # Example
2905 /// ```rust
2906 /// # #[cfg(feature = "deploy")] {
2907 /// # use std::collections::HashSet;
2908 /// # use futures::StreamExt;
2909 /// # use hydro_lang::prelude::*;
2910 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2911 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2912 /// .map(q!(|x| async move {
2913 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2914 /// x
2915 /// }))
2916 /// .resolve_futures_ordered()
2917 /// # },
2918 /// # |mut stream| async move {
2919 /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2920 /// # let mut output = Vec::new();
2921 /// # for _ in 1..10 {
2922 /// # output.push(stream.next().await.unwrap());
2923 /// # }
2924 /// # assert_eq!(
2925 /// # output,
2926 /// # vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2927 /// # );
2928 /// # },
2929 /// # ));
2930 /// # }
2931 pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
2932 Stream::new(
2933 self.location.clone(),
2934 HydroNode::ResolveFuturesOrdered {
2935 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2936 metadata: self
2937 .location
2938 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2939 },
2940 )
2941 }
2942}
2943
2944impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2945where
2946 L: Location<'a>,
2947{
2948 /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2949 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2950 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2951 Stream::new(
2952 self.location.outer().clone(),
2953 HydroNode::YieldConcat {
2954 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2955 metadata: self
2956 .location
2957 .outer()
2958 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2959 },
2960 )
2961 }
2962
2963 /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2964 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2965 ///
2966 /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2967 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2968 /// stream's [`Tick`] context.
2969 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2970 let out_location = Atomic {
2971 tick: self.location.clone(),
2972 };
2973
2974 Stream::new(
2975 out_location.clone(),
2976 HydroNode::YieldConcat {
2977 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2978 metadata: out_location
2979 .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2980 },
2981 )
2982 }
2983
2984 /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
2985 /// such as `fold` retrain their memory across ticks rather than resetting across batches of
2986 /// input.
2987 ///
2988 /// This API is particularly useful for stateful computation on batches of data, such as
2989 /// maintaining an accumulated state that is up to date with the current batch.
2990 ///
2991 /// # Example
2992 /// ```rust
2993 /// # #[cfg(feature = "deploy")] {
2994 /// # use hydro_lang::prelude::*;
2995 /// # use futures::StreamExt;
2996 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2997 /// let tick = process.tick();
2998 /// # // ticks are lazy by default, forces the second tick to run
2999 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3000 /// # let batch_first_tick = process
3001 /// # .source_iter(q!(vec![1, 2, 3, 4]))
3002 /// # .batch(&tick, nondet!(/** test */));
3003 /// # let batch_second_tick = process
3004 /// # .source_iter(q!(vec![5, 6, 7]))
3005 /// # .batch(&tick, nondet!(/** test */))
3006 /// # .defer_tick(); // appears on the second tick
3007 /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
3008 /// # batch_first_tick.chain(batch_second_tick).all_ticks();
3009 ///
3010 /// input.batch(&tick, nondet!(/** test */))
3011 /// .across_ticks(|s| s.count()).all_ticks()
3012 /// # }, |mut stream| async move {
3013 /// // [4, 7]
3014 /// assert_eq!(stream.next().await.unwrap(), 4);
3015 /// assert_eq!(stream.next().await.unwrap(), 7);
3016 /// # }));
3017 /// # }
3018 /// ```
3019 pub fn across_ticks<Out: BatchAtomic>(
3020 self,
3021 thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
3022 ) -> Out::Batched {
3023 thunk(self.all_ticks_atomic()).batched_atomic()
3024 }
3025
3026 /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
3027 /// always has the elements of `self` at tick `T - 1`.
3028 ///
3029 /// At tick `0`, the output stream is empty, since there is no previous tick.
3030 ///
3031 /// This operator enables stateful iterative processing with ticks, by sending data from one
3032 /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
3033 ///
3034 /// # Example
3035 /// ```rust
3036 /// # #[cfg(feature = "deploy")] {
3037 /// # use hydro_lang::prelude::*;
3038 /// # use futures::StreamExt;
3039 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3040 /// let tick = process.tick();
3041 /// // ticks are lazy by default, forces the second tick to run
3042 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3043 ///
3044 /// let batch_first_tick = process
3045 /// .source_iter(q!(vec![1, 2, 3, 4]))
3046 /// .batch(&tick, nondet!(/** test */));
3047 /// let batch_second_tick = process
3048 /// .source_iter(q!(vec![0, 3, 4, 5, 6]))
3049 /// .batch(&tick, nondet!(/** test */))
3050 /// .defer_tick(); // appears on the second tick
3051 /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
3052 ///
3053 /// changes_across_ticks.clone().filter_not_in(
3054 /// changes_across_ticks.defer_tick() // the elements from the previous tick
3055 /// ).all_ticks()
3056 /// # }, |mut stream| async move {
3057 /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
3058 /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
3059 /// # assert_eq!(stream.next().await.unwrap(), w);
3060 /// # }
3061 /// # }));
3062 /// # }
3063 /// ```
3064 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
3065 Stream::new(
3066 self.location.clone(),
3067 HydroNode::DeferTick {
3068 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3069 metadata: self
3070 .location
3071 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3072 },
3073 )
3074 }
3075}
3076
3077#[cfg(test)]
3078mod tests {
3079 #[cfg(feature = "deploy")]
3080 use futures::{SinkExt, StreamExt};
3081 #[cfg(feature = "deploy")]
3082 use hydro_deploy::Deployment;
3083 #[cfg(feature = "deploy")]
3084 use serde::{Deserialize, Serialize};
3085 #[cfg(any(feature = "deploy", feature = "sim"))]
3086 use stageleft::q;
3087
3088 #[cfg(any(feature = "deploy", feature = "sim"))]
3089 use crate::compile::builder::FlowBuilder;
3090 #[cfg(feature = "deploy")]
3091 use crate::live_collections::sliced::sliced;
3092 #[cfg(feature = "deploy")]
3093 use crate::live_collections::stream::ExactlyOnce;
3094 #[cfg(feature = "sim")]
3095 use crate::live_collections::stream::NoOrder;
3096 #[cfg(any(feature = "deploy", feature = "sim"))]
3097 use crate::live_collections::stream::TotalOrder;
3098 #[cfg(any(feature = "deploy", feature = "sim"))]
3099 use crate::location::Location;
3100 #[cfg(feature = "sim")]
3101 use crate::networking::TCP;
3102 #[cfg(any(feature = "deploy", feature = "sim"))]
3103 use crate::nondet::nondet;
3104
3105 mod backtrace_chained_ops;
3106
3107 #[cfg(feature = "deploy")]
3108 struct P1 {}
3109 #[cfg(feature = "deploy")]
3110 struct P2 {}
3111
3112 #[cfg(feature = "deploy")]
3113 #[derive(Serialize, Deserialize, Debug)]
3114 struct SendOverNetwork {
3115 n: u32,
3116 }
3117
3118 #[cfg(feature = "deploy")]
3119 #[tokio::test]
3120 async fn first_ten_distributed() {
3121 use crate::networking::TCP;
3122
3123 let mut deployment = Deployment::new();
3124
3125 let mut flow = FlowBuilder::new();
3126 let first_node = flow.process::<P1>();
3127 let second_node = flow.process::<P2>();
3128 let external = flow.external::<P2>();
3129
3130 let numbers = first_node.source_iter(q!(0..10));
3131 let out_port = numbers
3132 .map(q!(|n| SendOverNetwork { n }))
3133 .send(&second_node, TCP.fail_stop().bincode())
3134 .send_bincode_external(&external);
3135
3136 let nodes = flow
3137 .with_process(&first_node, deployment.Localhost())
3138 .with_process(&second_node, deployment.Localhost())
3139 .with_external(&external, deployment.Localhost())
3140 .deploy(&mut deployment);
3141
3142 deployment.deploy().await.unwrap();
3143
3144 let mut external_out = nodes.connect(out_port).await;
3145
3146 deployment.start().await.unwrap();
3147
3148 for i in 0..10 {
3149 assert_eq!(external_out.next().await.unwrap().n, i);
3150 }
3151 }
3152
3153 #[cfg(feature = "deploy")]
3154 #[tokio::test]
3155 async fn first_cardinality() {
3156 let mut deployment = Deployment::new();
3157
3158 let mut flow = FlowBuilder::new();
3159 let node = flow.process::<()>();
3160 let external = flow.external::<()>();
3161
3162 let node_tick = node.tick();
3163 let count = node_tick
3164 .singleton(q!([1, 2, 3]))
3165 .into_stream()
3166 .flatten_ordered()
3167 .first()
3168 .into_stream()
3169 .count()
3170 .all_ticks()
3171 .send_bincode_external(&external);
3172
3173 let nodes = flow
3174 .with_process(&node, deployment.Localhost())
3175 .with_external(&external, deployment.Localhost())
3176 .deploy(&mut deployment);
3177
3178 deployment.deploy().await.unwrap();
3179
3180 let mut external_out = nodes.connect(count).await;
3181
3182 deployment.start().await.unwrap();
3183
3184 assert_eq!(external_out.next().await.unwrap(), 1);
3185 }
3186
3187 #[cfg(feature = "deploy")]
3188 #[tokio::test]
3189 async fn unbounded_reduce_remembers_state() {
3190 let mut deployment = Deployment::new();
3191
3192 let mut flow = FlowBuilder::new();
3193 let node = flow.process::<()>();
3194 let external = flow.external::<()>();
3195
3196 let (input_port, input) = node.source_external_bincode(&external);
3197 let out = input
3198 .reduce(q!(|acc, v| *acc += v))
3199 .sample_eager(nondet!(/** test */))
3200 .send_bincode_external(&external);
3201
3202 let nodes = flow
3203 .with_process(&node, deployment.Localhost())
3204 .with_external(&external, deployment.Localhost())
3205 .deploy(&mut deployment);
3206
3207 deployment.deploy().await.unwrap();
3208
3209 let mut external_in = nodes.connect(input_port).await;
3210 let mut external_out = nodes.connect(out).await;
3211
3212 deployment.start().await.unwrap();
3213
3214 external_in.send(1).await.unwrap();
3215 assert_eq!(external_out.next().await.unwrap(), 1);
3216
3217 external_in.send(2).await.unwrap();
3218 assert_eq!(external_out.next().await.unwrap(), 3);
3219 }
3220
3221 #[cfg(feature = "deploy")]
3222 #[tokio::test]
3223 async fn top_level_bounded_cross_singleton() {
3224 let mut deployment = Deployment::new();
3225
3226 let mut flow = FlowBuilder::new();
3227 let node = flow.process::<()>();
3228 let external = flow.external::<()>();
3229
3230 let (input_port, input) =
3231 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3232
3233 let out = input
3234 .cross_singleton(
3235 node.source_iter(q!(vec![1, 2, 3]))
3236 .fold(q!(|| 0), q!(|acc, v| *acc += v)),
3237 )
3238 .send_bincode_external(&external);
3239
3240 let nodes = flow
3241 .with_process(&node, deployment.Localhost())
3242 .with_external(&external, deployment.Localhost())
3243 .deploy(&mut deployment);
3244
3245 deployment.deploy().await.unwrap();
3246
3247 let mut external_in = nodes.connect(input_port).await;
3248 let mut external_out = nodes.connect(out).await;
3249
3250 deployment.start().await.unwrap();
3251
3252 external_in.send(1).await.unwrap();
3253 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3254
3255 external_in.send(2).await.unwrap();
3256 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3257 }
3258
3259 #[cfg(feature = "deploy")]
3260 #[tokio::test]
3261 async fn top_level_bounded_reduce_cardinality() {
3262 let mut deployment = Deployment::new();
3263
3264 let mut flow = FlowBuilder::new();
3265 let node = flow.process::<()>();
3266 let external = flow.external::<()>();
3267
3268 let (input_port, input) =
3269 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3270
3271 let out = sliced! {
3272 let input = use(input, nondet!(/** test */));
3273 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
3274 input.cross_singleton(v.into_stream().count())
3275 }
3276 .send_bincode_external(&external);
3277
3278 let nodes = flow
3279 .with_process(&node, deployment.Localhost())
3280 .with_external(&external, deployment.Localhost())
3281 .deploy(&mut deployment);
3282
3283 deployment.deploy().await.unwrap();
3284
3285 let mut external_in = nodes.connect(input_port).await;
3286 let mut external_out = nodes.connect(out).await;
3287
3288 deployment.start().await.unwrap();
3289
3290 external_in.send(1).await.unwrap();
3291 assert_eq!(external_out.next().await.unwrap(), (1, 1));
3292
3293 external_in.send(2).await.unwrap();
3294 assert_eq!(external_out.next().await.unwrap(), (2, 1));
3295 }
3296
3297 #[cfg(feature = "deploy")]
3298 #[tokio::test]
3299 async fn top_level_bounded_into_singleton_cardinality() {
3300 let mut deployment = Deployment::new();
3301
3302 let mut flow = FlowBuilder::new();
3303 let node = flow.process::<()>();
3304 let external = flow.external::<()>();
3305
3306 let (input_port, input) =
3307 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3308
3309 let out = sliced! {
3310 let input = use(input, nondet!(/** test */));
3311 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
3312 input.cross_singleton(v.into_stream().count())
3313 }
3314 .send_bincode_external(&external);
3315
3316 let nodes = flow
3317 .with_process(&node, deployment.Localhost())
3318 .with_external(&external, deployment.Localhost())
3319 .deploy(&mut deployment);
3320
3321 deployment.deploy().await.unwrap();
3322
3323 let mut external_in = nodes.connect(input_port).await;
3324 let mut external_out = nodes.connect(out).await;
3325
3326 deployment.start().await.unwrap();
3327
3328 external_in.send(1).await.unwrap();
3329 assert_eq!(external_out.next().await.unwrap(), (1, 1));
3330
3331 external_in.send(2).await.unwrap();
3332 assert_eq!(external_out.next().await.unwrap(), (2, 1));
3333 }
3334
3335 #[cfg(feature = "deploy")]
3336 #[tokio::test]
3337 async fn atomic_fold_replays_each_tick() {
3338 let mut deployment = Deployment::new();
3339
3340 let mut flow = FlowBuilder::new();
3341 let node = flow.process::<()>();
3342 let external = flow.external::<()>();
3343
3344 let (input_port, input) =
3345 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3346 let tick = node.tick();
3347
3348 let out = input
3349 .batch(&tick, nondet!(/** test */))
3350 .cross_singleton(
3351 node.source_iter(q!(vec![1, 2, 3]))
3352 .atomic()
3353 .fold(q!(|| 0), q!(|acc, v| *acc += v))
3354 .snapshot_atomic(&tick, nondet!(/** test */)),
3355 )
3356 .all_ticks()
3357 .send_bincode_external(&external);
3358
3359 let nodes = flow
3360 .with_process(&node, deployment.Localhost())
3361 .with_external(&external, deployment.Localhost())
3362 .deploy(&mut deployment);
3363
3364 deployment.deploy().await.unwrap();
3365
3366 let mut external_in = nodes.connect(input_port).await;
3367 let mut external_out = nodes.connect(out).await;
3368
3369 deployment.start().await.unwrap();
3370
3371 external_in.send(1).await.unwrap();
3372 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3373
3374 external_in.send(2).await.unwrap();
3375 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3376 }
3377
3378 #[cfg(feature = "deploy")]
3379 #[tokio::test]
3380 async fn unbounded_scan_remembers_state() {
3381 let mut deployment = Deployment::new();
3382
3383 let mut flow = FlowBuilder::new();
3384 let node = flow.process::<()>();
3385 let external = flow.external::<()>();
3386
3387 let (input_port, input) = node.source_external_bincode(&external);
3388 let out = input
3389 .scan(
3390 q!(|| 0),
3391 q!(|acc, v| {
3392 *acc += v;
3393 Some(*acc)
3394 }),
3395 )
3396 .send_bincode_external(&external);
3397
3398 let nodes = flow
3399 .with_process(&node, deployment.Localhost())
3400 .with_external(&external, deployment.Localhost())
3401 .deploy(&mut deployment);
3402
3403 deployment.deploy().await.unwrap();
3404
3405 let mut external_in = nodes.connect(input_port).await;
3406 let mut external_out = nodes.connect(out).await;
3407
3408 deployment.start().await.unwrap();
3409
3410 external_in.send(1).await.unwrap();
3411 assert_eq!(external_out.next().await.unwrap(), 1);
3412
3413 external_in.send(2).await.unwrap();
3414 assert_eq!(external_out.next().await.unwrap(), 3);
3415 }
3416
3417 #[cfg(feature = "deploy")]
3418 #[tokio::test]
3419 async fn unbounded_enumerate_remembers_state() {
3420 let mut deployment = Deployment::new();
3421
3422 let mut flow = FlowBuilder::new();
3423 let node = flow.process::<()>();
3424 let external = flow.external::<()>();
3425
3426 let (input_port, input) = node.source_external_bincode(&external);
3427 let out = input.enumerate().send_bincode_external(&external);
3428
3429 let nodes = flow
3430 .with_process(&node, deployment.Localhost())
3431 .with_external(&external, deployment.Localhost())
3432 .deploy(&mut deployment);
3433
3434 deployment.deploy().await.unwrap();
3435
3436 let mut external_in = nodes.connect(input_port).await;
3437 let mut external_out = nodes.connect(out).await;
3438
3439 deployment.start().await.unwrap();
3440
3441 external_in.send(1).await.unwrap();
3442 assert_eq!(external_out.next().await.unwrap(), (0, 1));
3443
3444 external_in.send(2).await.unwrap();
3445 assert_eq!(external_out.next().await.unwrap(), (1, 2));
3446 }
3447
3448 #[cfg(feature = "deploy")]
3449 #[tokio::test]
3450 async fn unbounded_unique_remembers_state() {
3451 let mut deployment = Deployment::new();
3452
3453 let mut flow = FlowBuilder::new();
3454 let node = flow.process::<()>();
3455 let external = flow.external::<()>();
3456
3457 let (input_port, input) =
3458 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3459 let out = input.unique().send_bincode_external(&external);
3460
3461 let nodes = flow
3462 .with_process(&node, deployment.Localhost())
3463 .with_external(&external, deployment.Localhost())
3464 .deploy(&mut deployment);
3465
3466 deployment.deploy().await.unwrap();
3467
3468 let mut external_in = nodes.connect(input_port).await;
3469 let mut external_out = nodes.connect(out).await;
3470
3471 deployment.start().await.unwrap();
3472
3473 external_in.send(1).await.unwrap();
3474 assert_eq!(external_out.next().await.unwrap(), 1);
3475
3476 external_in.send(2).await.unwrap();
3477 assert_eq!(external_out.next().await.unwrap(), 2);
3478
3479 external_in.send(1).await.unwrap();
3480 external_in.send(3).await.unwrap();
3481 assert_eq!(external_out.next().await.unwrap(), 3);
3482 }
3483
3484 #[cfg(feature = "sim")]
3485 #[test]
3486 #[should_panic]
3487 fn sim_batch_nondet_size() {
3488 let mut flow = FlowBuilder::new();
3489 let node = flow.process::<()>();
3490
3491 let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3492
3493 let tick = node.tick();
3494 let out_recv = input
3495 .batch(&tick, nondet!(/** test */))
3496 .count()
3497 .all_ticks()
3498 .sim_output();
3499
3500 flow.sim().exhaustive(async || {
3501 in_send.send(());
3502 in_send.send(());
3503 in_send.send(());
3504
3505 assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3506 });
3507 }
3508
3509 #[cfg(feature = "sim")]
3510 #[test]
3511 fn sim_batch_preserves_order() {
3512 let mut flow = FlowBuilder::new();
3513 let node = flow.process::<()>();
3514
3515 let (in_send, input) = node.sim_input();
3516
3517 let tick = node.tick();
3518 let out_recv = input
3519 .batch(&tick, nondet!(/** test */))
3520 .all_ticks()
3521 .sim_output();
3522
3523 flow.sim().exhaustive(async || {
3524 in_send.send(1);
3525 in_send.send(2);
3526 in_send.send(3);
3527
3528 out_recv.assert_yields_only([1, 2, 3]).await;
3529 });
3530 }
3531
3532 #[cfg(feature = "sim")]
3533 #[test]
3534 #[should_panic]
3535 fn sim_batch_unordered_shuffles() {
3536 let mut flow = FlowBuilder::new();
3537 let node = flow.process::<()>();
3538
3539 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3540
3541 let tick = node.tick();
3542 let batch = input.batch(&tick, nondet!(/** test */));
3543 let out_recv = batch
3544 .clone()
3545 .min()
3546 .zip(batch.max())
3547 .all_ticks()
3548 .sim_output();
3549
3550 flow.sim().exhaustive(async || {
3551 in_send.send_many_unordered([1, 2, 3]);
3552
3553 if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3554 panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3555 }
3556 });
3557 }
3558
3559 #[cfg(feature = "sim")]
3560 #[test]
3561 fn sim_batch_unordered_shuffles_count() {
3562 let mut flow = FlowBuilder::new();
3563 let node = flow.process::<()>();
3564
3565 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3566
3567 let tick = node.tick();
3568 let batch = input.batch(&tick, nondet!(/** test */));
3569 let out_recv = batch.all_ticks().sim_output();
3570
3571 let instance_count = flow.sim().exhaustive(async || {
3572 in_send.send_many_unordered([1, 2, 3, 4]);
3573 out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3574 });
3575
3576 assert_eq!(
3577 instance_count,
3578 75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3579 )
3580 }
3581
3582 #[cfg(feature = "sim")]
3583 #[test]
3584 #[should_panic]
3585 fn sim_observe_order_batched() {
3586 let mut flow = FlowBuilder::new();
3587 let node = flow.process::<()>();
3588
3589 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3590
3591 let tick = node.tick();
3592 let batch = input.batch(&tick, nondet!(/** test */));
3593 let out_recv = batch
3594 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3595 .all_ticks()
3596 .sim_output();
3597
3598 flow.sim().exhaustive(async || {
3599 in_send.send_many_unordered([1, 2, 3, 4]);
3600 out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3601 });
3602 }
3603
3604 #[cfg(feature = "sim")]
3605 #[test]
3606 fn sim_observe_order_batched_count() {
3607 let mut flow = FlowBuilder::new();
3608 let node = flow.process::<()>();
3609
3610 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3611
3612 let tick = node.tick();
3613 let batch = input.batch(&tick, nondet!(/** test */));
3614 let out_recv = batch
3615 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3616 .all_ticks()
3617 .sim_output();
3618
3619 let instance_count = flow.sim().exhaustive(async || {
3620 in_send.send_many_unordered([1, 2, 3, 4]);
3621 let _ = out_recv.collect::<Vec<_>>().await;
3622 });
3623
3624 assert_eq!(
3625 instance_count,
3626 192 // 4! * 2^{4 - 1}
3627 )
3628 }
3629
3630 #[cfg(feature = "sim")]
3631 #[test]
3632 fn sim_unordered_count_instance_count() {
3633 let mut flow = FlowBuilder::new();
3634 let node = flow.process::<()>();
3635
3636 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3637
3638 let tick = node.tick();
3639 let out_recv = input
3640 .count()
3641 .snapshot(&tick, nondet!(/** test */))
3642 .all_ticks()
3643 .sim_output();
3644
3645 let instance_count = flow.sim().exhaustive(async || {
3646 in_send.send_many_unordered([1, 2, 3, 4]);
3647 assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3648 });
3649
3650 assert_eq!(
3651 instance_count,
3652 16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3653 )
3654 }
3655
3656 #[cfg(feature = "sim")]
3657 #[test]
3658 fn sim_top_level_assume_ordering() {
3659 let mut flow = FlowBuilder::new();
3660 let node = flow.process::<()>();
3661
3662 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3663
3664 let out_recv = input
3665 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3666 .sim_output();
3667
3668 let instance_count = flow.sim().exhaustive(async || {
3669 in_send.send_many_unordered([1, 2, 3]);
3670 let mut out = out_recv.collect::<Vec<_>>().await;
3671 out.sort();
3672 assert_eq!(out, vec![1, 2, 3]);
3673 });
3674
3675 assert_eq!(instance_count, 6)
3676 }
3677
3678 #[cfg(feature = "sim")]
3679 #[test]
3680 fn sim_top_level_assume_ordering_cycle_back() {
3681 let mut flow = FlowBuilder::new();
3682 let node = flow.process::<()>();
3683 let node2 = flow.process::<()>();
3684
3685 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3686
3687 let (complete_cycle_back, cycle_back) =
3688 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3689 let ordered = input
3690 .merge_unordered(cycle_back)
3691 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3692 complete_cycle_back.complete(
3693 ordered
3694 .clone()
3695 .map(q!(|v| v + 1))
3696 .filter(q!(|v| v % 2 == 1))
3697 .send(&node2, TCP.fail_stop().bincode())
3698 .send(&node, TCP.fail_stop().bincode()),
3699 );
3700
3701 let out_recv = ordered.sim_output();
3702
3703 let mut saw = false;
3704 let instance_count = flow.sim().exhaustive(async || {
3705 in_send.send_many_unordered([0, 2]);
3706 let out = out_recv.collect::<Vec<_>>().await;
3707
3708 if out.starts_with(&[0, 1, 2]) {
3709 saw = true;
3710 }
3711 });
3712
3713 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3714 assert_eq!(instance_count, 6);
3715 }
3716
3717 #[cfg(feature = "sim")]
3718 #[test]
3719 fn sim_top_level_assume_ordering_cycle_back_tick() {
3720 let mut flow = FlowBuilder::new();
3721 let node = flow.process::<()>();
3722 let node2 = flow.process::<()>();
3723
3724 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3725
3726 let (complete_cycle_back, cycle_back) =
3727 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3728 let ordered = input
3729 .merge_unordered(cycle_back)
3730 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3731 complete_cycle_back.complete(
3732 ordered
3733 .clone()
3734 .batch(&node.tick(), nondet!(/** test */))
3735 .all_ticks()
3736 .map(q!(|v| v + 1))
3737 .filter(q!(|v| v % 2 == 1))
3738 .send(&node2, TCP.fail_stop().bincode())
3739 .send(&node, TCP.fail_stop().bincode()),
3740 );
3741
3742 let out_recv = ordered.sim_output();
3743
3744 let mut saw = false;
3745 let instance_count = flow.sim().exhaustive(async || {
3746 in_send.send_many_unordered([0, 2]);
3747 let out = out_recv.collect::<Vec<_>>().await;
3748
3749 if out.starts_with(&[0, 1, 2]) {
3750 saw = true;
3751 }
3752 });
3753
3754 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3755 assert_eq!(instance_count, 58);
3756 }
3757
3758 #[cfg(feature = "sim")]
3759 #[test]
3760 fn sim_top_level_assume_ordering_multiple() {
3761 let mut flow = FlowBuilder::new();
3762 let node = flow.process::<()>();
3763 let node2 = flow.process::<()>();
3764
3765 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3766 let (_, input2) = node.sim_input::<_, NoOrder, _>();
3767
3768 let (complete_cycle_back, cycle_back) =
3769 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3770 let input1_ordered = input
3771 .clone()
3772 .merge_unordered(cycle_back)
3773 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3774 let foo = input1_ordered
3775 .clone()
3776 .map(q!(|v| v + 3))
3777 .weaken_ordering::<NoOrder>()
3778 .merge_unordered(input2)
3779 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3780
3781 complete_cycle_back.complete(
3782 foo.filter(q!(|v| *v == 3))
3783 .send(&node2, TCP.fail_stop().bincode())
3784 .send(&node, TCP.fail_stop().bincode()),
3785 );
3786
3787 let out_recv = input1_ordered.sim_output();
3788
3789 let mut saw = false;
3790 let instance_count = flow.sim().exhaustive(async || {
3791 in_send.send_many_unordered([0, 1]);
3792 let out = out_recv.collect::<Vec<_>>().await;
3793
3794 if out.starts_with(&[0, 3, 1]) {
3795 saw = true;
3796 }
3797 });
3798
3799 assert!(saw, "did not see an instance with 0, 3, 1 in order");
3800 assert_eq!(instance_count, 24);
3801 }
3802
3803 #[cfg(feature = "sim")]
3804 #[test]
3805 fn sim_atomic_assume_ordering_cycle_back() {
3806 let mut flow = FlowBuilder::new();
3807 let node = flow.process::<()>();
3808 let node2 = flow.process::<()>();
3809
3810 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3811
3812 let (complete_cycle_back, cycle_back) =
3813 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3814 let ordered = input
3815 .merge_unordered(cycle_back)
3816 .atomic()
3817 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3818 .end_atomic();
3819 complete_cycle_back.complete(
3820 ordered
3821 .clone()
3822 .map(q!(|v| v + 1))
3823 .filter(q!(|v| v % 2 == 1))
3824 .send(&node2, TCP.fail_stop().bincode())
3825 .send(&node, TCP.fail_stop().bincode()),
3826 );
3827
3828 let out_recv = ordered.sim_output();
3829
3830 let instance_count = flow.sim().exhaustive(async || {
3831 in_send.send_many_unordered([0, 2]);
3832 let out = out_recv.collect::<Vec<_>>().await;
3833 assert_eq!(out.len(), 4);
3834 });
3835 assert_eq!(instance_count, 22);
3836 }
3837
3838 #[cfg(feature = "deploy")]
3839 #[tokio::test]
3840 async fn partition_evens_odds() {
3841 let mut deployment = Deployment::new();
3842
3843 let mut flow = FlowBuilder::new();
3844 let node = flow.process::<()>();
3845 let external = flow.external::<()>();
3846
3847 let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
3848 let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
3849 let evens_port = evens.send_bincode_external(&external);
3850 let odds_port = odds.send_bincode_external(&external);
3851
3852 let nodes = flow
3853 .with_process(&node, deployment.Localhost())
3854 .with_external(&external, deployment.Localhost())
3855 .deploy(&mut deployment);
3856
3857 deployment.deploy().await.unwrap();
3858
3859 let mut evens_out = nodes.connect(evens_port).await;
3860 let mut odds_out = nodes.connect(odds_port).await;
3861
3862 deployment.start().await.unwrap();
3863
3864 let mut even_results = Vec::new();
3865 for _ in 0..3 {
3866 even_results.push(evens_out.next().await.unwrap());
3867 }
3868 even_results.sort();
3869 assert_eq!(even_results, vec![2, 4, 6]);
3870
3871 let mut odd_results = Vec::new();
3872 for _ in 0..3 {
3873 odd_results.push(odds_out.next().await.unwrap());
3874 }
3875 odd_results.sort();
3876 assert_eq!(odd_results, vec![1, 3, 5]);
3877 }
3878
3879 #[cfg(feature = "deploy")]
3880 #[tokio::test]
3881 async fn unconsumed_inspect_still_runs() {
3882 use crate::deploy::DeployCrateWrapper;
3883
3884 let mut deployment = Deployment::new();
3885
3886 let mut flow = FlowBuilder::new();
3887 let node = flow.process::<()>();
3888
3889 // The return value of .inspect() is intentionally dropped.
3890 // Before the Null-root fix, this would silently do nothing.
3891 node.source_iter(q!(0..5))
3892 .inspect(q!(|x| println!("inspect: {}", x)));
3893
3894 let nodes = flow
3895 .with_process(&node, deployment.Localhost())
3896 .deploy(&mut deployment);
3897
3898 deployment.deploy().await.unwrap();
3899
3900 let mut stdout = nodes.get_process(&node).stdout();
3901
3902 deployment.start().await.unwrap();
3903
3904 let mut lines = Vec::new();
3905 for _ in 0..5 {
3906 lines.push(stdout.recv().await.unwrap());
3907 }
3908 lines.sort();
3909 assert_eq!(
3910 lines,
3911 vec![
3912 "inspect: 0",
3913 "inspect: 1",
3914 "inspect: 2",
3915 "inspect: 3",
3916 "inspect: 4",
3917 ]
3918 );
3919 }
3920
3921 #[cfg(feature = "sim")]
3922 #[test]
3923 fn sim_limit() {
3924 let mut flow = FlowBuilder::new();
3925 let node = flow.process::<()>();
3926
3927 let (in_send, input) = node.sim_input();
3928
3929 let out_recv = input.limit(q!(3)).sim_output();
3930
3931 flow.sim().exhaustive(async || {
3932 in_send.send(1);
3933 in_send.send(2);
3934 in_send.send(3);
3935 in_send.send(4);
3936 in_send.send(5);
3937
3938 out_recv.assert_yields_only([1, 2, 3]).await;
3939 });
3940 }
3941
3942 #[cfg(feature = "sim")]
3943 #[test]
3944 fn sim_limit_zero() {
3945 let mut flow = FlowBuilder::new();
3946 let node = flow.process::<()>();
3947
3948 let (in_send, input) = node.sim_input();
3949
3950 let out_recv = input.limit(q!(0)).sim_output();
3951
3952 flow.sim().exhaustive(async || {
3953 in_send.send(1);
3954 in_send.send(2);
3955
3956 out_recv.assert_yields_only::<i32, _>([]).await;
3957 });
3958 }
3959
3960 #[cfg(feature = "sim")]
3961 #[test]
3962 fn sim_merge_ordered() {
3963 let mut flow = FlowBuilder::new();
3964 let node = flow.process::<()>();
3965
3966 let (in_send, input) = node.sim_input();
3967 let (in_send2, input2) = node.sim_input();
3968
3969 let out_recv = input
3970 .merge_ordered(input2, nondet!(/** test */))
3971 .sim_output();
3972
3973 let mut saw_out_of_order = false;
3974 let instances = flow.sim().exhaustive(async || {
3975 in_send.send(1);
3976 in_send.send(2);
3977 in_send2.send(3);
3978 in_send2.send(4);
3979
3980 let out = out_recv.collect::<Vec<_>>().await;
3981
3982 if out == [1, 3, 2, 4] {
3983 saw_out_of_order = true;
3984 }
3985
3986 // Assert ordering preservation: elements from each input must
3987 // appear in their original relative order.
3988 let mut first_elements = out.iter().filter(|v| **v <= 2).copied().collect::<Vec<_>>();
3989 let mut second_elements = out.iter().filter(|v| **v > 2).copied().collect::<Vec<_>>();
3990 assert_eq!(
3991 first_elements,
3992 vec![1, 2],
3993 "first input order violated: {:?}",
3994 out
3995 );
3996 assert_eq!(
3997 second_elements,
3998 vec![3, 4],
3999 "second input order violated: {:?}",
4000 out
4001 );
4002
4003 first_elements.append(&mut second_elements);
4004 first_elements.sort();
4005 assert_eq!(first_elements, vec![1, 2, 3, 4]);
4006 });
4007
4008 assert!(saw_out_of_order);
4009 assert_eq!(instances, 6);
4010 }
4011
4012 /// Tests that merge_ordered passes through elements when only one input
4013 /// has data.
4014 #[cfg(feature = "sim")]
4015 #[test]
4016 fn sim_merge_ordered_one_empty() {
4017 let mut flow = FlowBuilder::new();
4018 let node = flow.process::<()>();
4019
4020 let (in_send, input) = node.sim_input();
4021 let (_in_send2, input2) = node.sim_input();
4022
4023 let out_recv = input
4024 .merge_ordered(input2, nondet!(/** test */))
4025 .sim_output();
4026
4027 let instances = flow.sim().exhaustive(async || {
4028 in_send.send(1);
4029 in_send.send(2);
4030
4031 let out = out_recv.collect::<Vec<_>>().await;
4032 assert_eq!(out, vec![1, 2]);
4033 });
4034
4035 // Only one possible interleaving when one input is empty
4036 assert_eq!(instances, 1);
4037 }
4038
4039 /// Tests that merge_ordered correctly handles feedback cycles.
4040 /// An element output from merge_ordered is filtered and cycled back to
4041 /// one of its inputs. The one-at-a-time release must allow the cycled-back
4042 /// element to arrive and potentially be emitted before elements still
4043 /// waiting on the other input.
4044 #[cfg(feature = "sim")]
4045 #[test]
4046 fn sim_merge_ordered_cycle_back() {
4047 let mut flow = FlowBuilder::new();
4048 let node = flow.process::<()>();
4049
4050 let (in_send, input) = node.sim_input();
4051
4052 // Create a forward ref for the cycle back
4053 let (complete_cycle_back, cycle_back) =
4054 node.forward_ref::<super::Stream<_, _, _, TotalOrder>>();
4055
4056 // merge_ordered: input (external) with cycle_back
4057 let merged = input.merge_ordered(cycle_back, nondet!(/** test */));
4058
4059 // Cycle back: elements equal to 1 get mapped to 10 and fed back
4060 complete_cycle_back.complete(merged.clone().filter(q!(|v| *v == 1)).map(q!(|v| v * 10)));
4061
4062 let out_recv = merged.sim_output();
4063
4064 // Send 1 and 2. Element 1 should cycle back as 10.
4065 // Valid orderings must have 1 before 10 (since 10 depends on 1).
4066 let mut saw_cycle_before_second = false;
4067 flow.sim().exhaustive(async || {
4068 in_send.send(1);
4069 in_send.send(2);
4070
4071 let out = out_recv.collect::<Vec<_>>().await;
4072
4073 // 10 must always come after 1 (causal dependency)
4074 let pos_1 = out.iter().position(|v| *v == 1).unwrap();
4075 let pos_10 = out.iter().position(|v| *v == 10).unwrap();
4076 assert!(pos_1 < pos_10, "causal order violated: {:?}", out);
4077
4078 // Check if we see [1, 10, 2] — the cycled element beats the second input
4079 if out == [1, 10, 2] {
4080 saw_cycle_before_second = true;
4081 }
4082
4083 let mut sorted = out;
4084 sorted.sort();
4085 assert_eq!(sorted, vec![1, 2, 10]);
4086 });
4087
4088 assert!(
4089 saw_cycle_before_second,
4090 "never saw the cycled element arrive before the second input element"
4091 );
4092 }
4093
4094 /// Tests that merge_ordered correctly interleaves when one input has a
4095 /// delayed element. With a: [1, _delay_, 2] and b: [3, 4], the delayed
4096 /// element 2 should be able to appear after b's elements.
4097 #[cfg(feature = "sim")]
4098 #[test]
4099 fn sim_merge_ordered_delayed() {
4100 let mut flow = FlowBuilder::new();
4101 let node = flow.process::<()>();
4102
4103 let (in_send, input) = node.sim_input();
4104 let (in_send2, input2) = node.sim_input();
4105
4106 let out_recv = input
4107 .merge_ordered(input2, nondet!(/** test */))
4108 .sim_output();
4109
4110 let mut saw_delayed_interleaving = false;
4111 flow.sim().exhaustive(async || {
4112 // Send 1 from a, and 3, 4 from b
4113 in_send.send(1);
4114 in_send2.send(3);
4115 in_send2.send(4);
4116
4117 // Collect what's available so far
4118 let first_batch = out_recv.collect::<Vec<_>>().await;
4119
4120 // Now send the delayed element 2 from a
4121 in_send.send(2);
4122 let second_batch = out_recv.collect::<Vec<_>>().await;
4123
4124 let mut all: Vec<_> = first_batch
4125 .iter()
4126 .chain(second_batch.iter())
4127 .copied()
4128 .collect();
4129
4130 // Check if we saw [1, 3, 4, 2] — the delayed interleaving
4131 if all == [1, 3, 4, 2] {
4132 saw_delayed_interleaving = true;
4133 }
4134
4135 all.sort();
4136 assert_eq!(all, vec![1, 2, 3, 4]);
4137 });
4138
4139 assert!(saw_delayed_interleaving);
4140 }
4141
4142 /// Deploy test: merge_ordered with a delayed element on one input.
4143 /// Sends a=1, b=3, b=4, then after receiving those, sends a=2.
4144 /// Expects to see [1, 3, 4] first, then [2] — demonstrating that
4145 /// both inputs are pulled and the delayed element arrives later.
4146 #[cfg(feature = "deploy")]
4147 #[tokio::test]
4148 async fn deploy_merge_ordered_delayed() {
4149 let mut deployment = Deployment::new();
4150
4151 let mut flow = FlowBuilder::new();
4152 let node = flow.process::<()>();
4153 let external = flow.external::<()>();
4154
4155 let (input_a_port, input_a) = node.source_external_bincode(&external);
4156 let (input_b_port, input_b) = node.source_external_bincode(&external);
4157
4158 let out = input_a
4159 .assume_ordering(nondet!(/** test */))
4160 .merge_ordered(
4161 input_b.assume_ordering(nondet!(/** test */)),
4162 nondet!(/** test */),
4163 )
4164 .send_bincode_external(&external);
4165
4166 let nodes = flow
4167 .with_process(&node, deployment.Localhost())
4168 .with_external(&external, deployment.Localhost())
4169 .deploy(&mut deployment);
4170
4171 deployment.deploy().await.unwrap();
4172
4173 let mut ext_a = nodes.connect(input_a_port).await;
4174 let mut ext_b = nodes.connect(input_b_port).await;
4175 let mut ext_out = nodes.connect(out).await;
4176
4177 deployment.start().await.unwrap();
4178
4179 // Send a=1, b=3, b=4
4180 ext_a.send(1).await.unwrap();
4181 ext_b.send(3).await.unwrap();
4182 ext_b.send(4).await.unwrap();
4183
4184 // Collect the first 3 elements
4185 let mut received = Vec::new();
4186 for _ in 0..3 {
4187 received.push(ext_out.next().await.unwrap());
4188 }
4189
4190 // Now send the delayed a=2
4191 ext_a.send(2).await.unwrap();
4192 received.push(ext_out.next().await.unwrap());
4193
4194 // All elements should be present
4195 received.sort();
4196 assert_eq!(received, vec![1, 2, 3, 4]);
4197 }
4198
4199 #[cfg(feature = "deploy")]
4200 #[tokio::test]
4201 async fn monotone_fold_threshold() {
4202 use crate::properties::manual_proof;
4203
4204 let mut deployment = Deployment::new();
4205
4206 let mut flow = FlowBuilder::new();
4207 let node = flow.process::<()>();
4208 let external = flow.external::<()>();
4209
4210 let in_unbounded: super::Stream<_, _> =
4211 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4212 let sum = in_unbounded.fold(
4213 q!(|| 0),
4214 q!(
4215 |sum, v| {
4216 *sum += v;
4217 },
4218 monotone = manual_proof!(/** test */)
4219 ),
4220 );
4221
4222 let threshold_out = sum
4223 .threshold_greater_or_equal(node.singleton(q!(7)))
4224 .send_bincode_external(&external);
4225
4226 let nodes = flow
4227 .with_process(&node, deployment.Localhost())
4228 .with_external(&external, deployment.Localhost())
4229 .deploy(&mut deployment);
4230
4231 deployment.deploy().await.unwrap();
4232
4233 let mut threshold_out = nodes.connect(threshold_out).await;
4234
4235 deployment.start().await.unwrap();
4236
4237 assert_eq!(threshold_out.next().await.unwrap(), 7);
4238 }
4239
4240 #[cfg(feature = "deploy")]
4241 #[tokio::test]
4242 async fn monotone_count_threshold() {
4243 let mut deployment = Deployment::new();
4244
4245 let mut flow = FlowBuilder::new();
4246 let node = flow.process::<()>();
4247 let external = flow.external::<()>();
4248
4249 let in_unbounded: super::Stream<_, _> =
4250 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4251 let sum = in_unbounded.count();
4252
4253 let threshold_out = sum
4254 .threshold_greater_or_equal(node.singleton(q!(3)))
4255 .send_bincode_external(&external);
4256
4257 let nodes = flow
4258 .with_process(&node, deployment.Localhost())
4259 .with_external(&external, deployment.Localhost())
4260 .deploy(&mut deployment);
4261
4262 deployment.deploy().await.unwrap();
4263
4264 let mut threshold_out = nodes.connect(threshold_out).await;
4265
4266 deployment.start().await.unwrap();
4267
4268 assert_eq!(threshold_out.next().await.unwrap(), 3);
4269 }
4270
4271 #[cfg(feature = "deploy")]
4272 #[tokio::test]
4273 async fn monotone_map_order_preserving_threshold() {
4274 use crate::properties::manual_proof;
4275
4276 let mut deployment = Deployment::new();
4277
4278 let mut flow = FlowBuilder::new();
4279 let node = flow.process::<()>();
4280 let external = flow.external::<()>();
4281
4282 let in_unbounded: super::Stream<_, _> =
4283 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4284 let sum = in_unbounded.fold(
4285 q!(|| 0),
4286 q!(
4287 |sum, v| {
4288 *sum += v;
4289 },
4290 monotone = manual_proof!(/** test */)
4291 ),
4292 );
4293
4294 // map with order_preserving should preserve monotonicity
4295 let doubled = sum.map(q!(
4296 |v| v * 2,
4297 order_preserving = manual_proof!(/** doubling preserves order */)
4298 ));
4299
4300 let threshold_out = doubled
4301 .threshold_greater_or_equal(node.singleton(q!(14)))
4302 .send_bincode_external(&external);
4303
4304 let nodes = flow
4305 .with_process(&node, deployment.Localhost())
4306 .with_external(&external, deployment.Localhost())
4307 .deploy(&mut deployment);
4308
4309 deployment.deploy().await.unwrap();
4310
4311 let mut threshold_out = nodes.connect(threshold_out).await;
4312
4313 deployment.start().await.unwrap();
4314
4315 assert_eq!(threshold_out.next().await.unwrap(), 14);
4316 }
4317
4318 // === Compile-time type tests for join/cross_product ordering ===
4319
4320 #[cfg(any(feature = "deploy", feature = "sim"))]
4321 mod join_ordering_type_tests {
4322 use crate::live_collections::boundedness::{Bounded, Unbounded};
4323 use crate::live_collections::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
4324 use crate::location::{Location, Process};
4325
4326 #[expect(dead_code, reason = "compile-time type test")]
4327 fn join_unbounded_with_bounded_preserves_order<'a>(
4328 left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4329 right: Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4330 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4331 left.join(right)
4332 }
4333
4334 #[expect(dead_code, reason = "compile-time type test")]
4335 fn join_unbounded_with_unbounded_is_no_order<'a>(
4336 left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4337 right: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4338 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4339 left.join(right)
4340 }
4341
4342 #[expect(dead_code, reason = "compile-time type test")]
4343 fn join_bounded_with_bounded_preserves_order<'a, L: Location<'a>>(
4344 left: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4345 right: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4346 ) -> Stream<(i32, (char, char)), L, Bounded, TotalOrder, ExactlyOnce> {
4347 left.join(right)
4348 }
4349
4350 #[expect(dead_code, reason = "compile-time type test")]
4351 fn join_unbounded_noorder_with_bounded<'a>(
4352 left: Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce>,
4353 right: Stream<(i32, char), Process<'a>, Bounded, NoOrder, ExactlyOnce>,
4354 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4355 left.join(right)
4356 }
4357
4358 // === Compile-time type tests for cross_product ordering ===
4359
4360 #[expect(dead_code, reason = "compile-time type test")]
4361 fn cross_product_unbounded_with_bounded_preserves_order<'a>(
4362 left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4363 right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4364 ) -> Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4365 left.cross_product(right)
4366 }
4367
4368 #[expect(dead_code, reason = "compile-time type test")]
4369 fn cross_product_bounded_with_bounded_preserves_order<'a>(
4370 left: Stream<i32, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4371 right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4372 ) -> Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce> {
4373 left.cross_product(right)
4374 }
4375
4376 #[expect(dead_code, reason = "compile-time type test")]
4377 fn cross_product_unbounded_with_unbounded_is_no_order<'a>(
4378 left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4379 right: Stream<char, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4380 ) -> Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4381 left.cross_product(right)
4382 }
4383 } // mod join_ordering_type_tests
4384
4385 // === Runtime correctness tests for bounded join/cross_product ===
4386
4387 #[cfg(feature = "sim")]
4388 #[test]
4389 fn cross_product_mixed_boundedness_correctness() {
4390 use stageleft::q;
4391
4392 use crate::compile::builder::FlowBuilder;
4393 use crate::nondet::nondet;
4394
4395 let mut flow = FlowBuilder::new();
4396 let process = flow.process::<()>();
4397 let tick = process.tick();
4398
4399 let left = process.source_iter(q!(vec![1, 2]));
4400 let right = process
4401 .source_iter(q!(vec!['a', 'b']))
4402 .batch(&tick, nondet!(/** test */))
4403 .all_ticks();
4404
4405 let out = left.cross_product(right).sim_output();
4406
4407 flow.sim().exhaustive(async || {
4408 out.assert_yields_only_unordered(vec![(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')])
4409 .await;
4410 });
4411 }
4412
4413 #[cfg(feature = "sim")]
4414 #[test]
4415 fn join_mixed_boundedness_correctness() {
4416 use stageleft::q;
4417
4418 use crate::compile::builder::FlowBuilder;
4419 use crate::nondet::nondet;
4420
4421 let mut flow = FlowBuilder::new();
4422 let process = flow.process::<()>();
4423 let tick = process.tick();
4424
4425 let left = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
4426 let right = process
4427 .source_iter(q!(vec![(1, 'x'), (2, 'y')]))
4428 .batch(&tick, nondet!(/** test */))
4429 .all_ticks();
4430
4431 let out = left.join(right).sim_output();
4432
4433 flow.sim().exhaustive(async || {
4434 out.assert_yields_only_unordered(vec![(1, ('a', 'x')), (2, ('b', 'y'))])
4435 .await;
4436 });
4437 }
4438}