hydro_lang/live_collections/stream/mod.rs
1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::{Generate, KeyedStream};
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::singleton::SingletonBound;
27#[cfg(stageleft_runtime)]
28use crate::location::dynamic::{DynLocation, LocationId};
29use crate::location::tick::{Atomic, DeferTick};
30use crate::location::{Location, Tick, TopLevel, check_matching_location};
31use crate::manual_expr::ManualExpr;
32use crate::nondet::{NonDet, nondet};
33use crate::prelude::manual_proof;
34use crate::properties::{
35 AggFuncAlgebra, ApplyMonotoneStream, ValidCommutativityFor, ValidIdempotenceFor,
36};
37
38pub mod networking;
39
40/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
41#[sealed::sealed]
42pub trait Ordering:
43 MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
44{
45 /// The [`StreamOrder`] corresponding to this type.
46 const ORDERING_KIND: StreamOrder;
47}
48
49/// Marks the stream as being totally ordered, which means that there are
50/// no sources of non-determinism (other than intentional ones) that will
51/// affect the order of elements.
52pub enum TotalOrder {}
53
54#[sealed::sealed]
55impl Ordering for TotalOrder {
56 const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
57}
58
59/// Marks the stream as having no order, which means that the order of
60/// elements may be affected by non-determinism.
61///
62/// This restricts certain operators, such as `fold` and `reduce`, to only
63/// be used with commutative aggregation functions.
64pub enum NoOrder {}
65
66#[sealed::sealed]
67impl Ordering for NoOrder {
68 const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
69}
70
71/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
72/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
73/// have `Self` guarantees instead.
74#[sealed::sealed]
75pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
76#[sealed::sealed]
77impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
78
79/// Helper trait for determining the weakest of two orderings.
80#[sealed::sealed]
81pub trait MinOrder<Other: ?Sized> {
82 /// The weaker of the two orderings.
83 type Min: Ordering;
84}
85
86#[sealed::sealed]
87impl<O: Ordering> MinOrder<O> for TotalOrder {
88 type Min = O;
89}
90
91#[sealed::sealed]
92impl<O: Ordering> MinOrder<O> for NoOrder {
93 type Min = NoOrder;
94}
95
96/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
97#[sealed::sealed]
98pub trait Retries:
99 MinRetries<Self, Min = Self>
100 + MinRetries<ExactlyOnce, Min = Self>
101 + MinRetries<AtLeastOnce, Min = AtLeastOnce>
102{
103 /// The [`StreamRetry`] corresponding to this type.
104 const RETRIES_KIND: StreamRetry;
105}
106
107/// Marks the stream as having deterministic message cardinality, with no
108/// possibility of duplicates.
109pub enum ExactlyOnce {}
110
111#[sealed::sealed]
112impl Retries for ExactlyOnce {
113 const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
114}
115
116/// Marks the stream as having non-deterministic message cardinality, which
117/// means that duplicates may occur, but messages will not be dropped.
118pub enum AtLeastOnce {}
119
120#[sealed::sealed]
121impl Retries for AtLeastOnce {
122 const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
123}
124
125/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
126/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
127/// have `Self` guarantees instead.
128#[sealed::sealed]
129pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
130#[sealed::sealed]
131impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
132
133/// Helper trait for determining the weakest of two retry guarantees.
134#[sealed::sealed]
135pub trait MinRetries<Other: ?Sized> {
136 /// The weaker of the two retry guarantees.
137 type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
138}
139
140#[sealed::sealed]
141impl<R: Retries> MinRetries<R> for ExactlyOnce {
142 type Min = R;
143}
144
145#[sealed::sealed]
146impl<R: Retries> MinRetries<R> for AtLeastOnce {
147 type Min = AtLeastOnce;
148}
149
150#[sealed::sealed]
151#[diagnostic::on_unimplemented(
152 message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
153 label = "required here",
154 note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
155)]
156/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
157pub trait IsOrdered: Ordering {}
158
159#[sealed::sealed]
160#[diagnostic::do_not_recommend]
161impl IsOrdered for TotalOrder {}
162
163#[sealed::sealed]
164#[diagnostic::on_unimplemented(
165 message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
166 label = "required here",
167 note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
168)]
169/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
170pub trait IsExactlyOnce: Retries {}
171
172#[sealed::sealed]
173#[diagnostic::do_not_recommend]
174impl IsExactlyOnce for ExactlyOnce {}
175
176/// Streaming sequence of elements with type `Type`.
177///
178/// This live collection represents a growing sequence of elements, with new elements being
179/// asynchronously appended to the end of the sequence. This can be used to model the arrival
180/// of network input, such as API requests, or streaming ingestion.
181///
182/// By default, all streams have deterministic ordering and each element is materialized exactly
183/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
184/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
185/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
186///
187/// Type Parameters:
188/// - `Type`: the type of elements in the stream
189/// - `Loc`: the location where the stream is being materialized
190/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
191/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
192/// (default is [`TotalOrder`])
193/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
194/// [`AtLeastOnce`] (default is [`ExactlyOnce`])
195pub struct Stream<
196 Type,
197 Loc,
198 Bound: Boundedness = Unbounded,
199 Order: Ordering = TotalOrder,
200 Retry: Retries = ExactlyOnce,
201> {
202 pub(crate) location: Loc,
203 pub(crate) ir_node: RefCell<HydroNode>,
204 pub(crate) flow_state: FlowState,
205
206 _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
207}
208
209impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
210 fn drop(&mut self) {
211 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
212 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
213 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
214 input: Box::new(ir_node),
215 op_metadata: HydroIrOpMetadata::new(),
216 });
217 }
218 }
219}
220
221impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
222 for Stream<T, L, Unbounded, O, R>
223where
224 L: Location<'a>,
225{
226 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
227 let new_meta = stream
228 .location
229 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
230
231 Stream {
232 location: stream.location.clone(),
233 flow_state: stream.flow_state.clone(),
234 ir_node: RefCell::new(HydroNode::Cast {
235 inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
236 metadata: new_meta,
237 }),
238 _phantom: PhantomData,
239 }
240 }
241}
242
243impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
244 for Stream<T, L, B, NoOrder, R>
245where
246 L: Location<'a>,
247{
248 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
249 stream.weaken_ordering()
250 }
251}
252
253impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
254 for Stream<T, L, B, O, AtLeastOnce>
255where
256 L: Location<'a>,
257{
258 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
259 stream.weaken_retries()
260 }
261}
262
263impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
264where
265 L: Location<'a>,
266{
267 fn defer_tick(self) -> Self {
268 Stream::defer_tick(self)
269 }
270}
271
272impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
273 for Stream<T, Tick<L>, Bounded, O, R>
274where
275 L: Location<'a>,
276{
277 type Location = Tick<L>;
278
279 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
280 Stream::new(
281 location.clone(),
282 HydroNode::CycleSource {
283 cycle_id,
284 metadata: location.new_node_metadata(Self::collection_kind()),
285 },
286 )
287 }
288}
289
290impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
291 for Stream<T, Tick<L>, Bounded, O, R>
292where
293 L: Location<'a>,
294{
295 type Location = Tick<L>;
296
297 fn location(&self) -> &Self::Location {
298 self.location()
299 }
300
301 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
302 let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
303 location.clone(),
304 HydroNode::DeferTick {
305 input: Box::new(HydroNode::CycleSource {
306 cycle_id,
307 metadata: location.new_node_metadata(Self::collection_kind()),
308 }),
309 metadata: location.new_node_metadata(Self::collection_kind()),
310 },
311 );
312
313 from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
314 }
315}
316
317impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
318 for Stream<T, Tick<L>, Bounded, O, R>
319where
320 L: Location<'a>,
321{
322 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
323 assert_eq!(
324 Location::id(&self.location),
325 expected_location,
326 "locations do not match"
327 );
328 self.location
329 .flow_state()
330 .borrow_mut()
331 .push_root(HydroRoot::CycleSink {
332 cycle_id,
333 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
334 op_metadata: HydroIrOpMetadata::new(),
335 });
336 }
337}
338
339impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
340 for Stream<T, L, B, O, R>
341where
342 L: Location<'a>,
343{
344 type Location = L;
345
346 fn create_source(cycle_id: CycleId, location: L) -> Self {
347 Stream::new(
348 location.clone(),
349 HydroNode::CycleSource {
350 cycle_id,
351 metadata: location.new_node_metadata(Self::collection_kind()),
352 },
353 )
354 }
355}
356
357impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
358 for Stream<T, L, B, O, R>
359where
360 L: Location<'a>,
361{
362 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
363 assert_eq!(
364 Location::id(&self.location),
365 expected_location,
366 "locations do not match"
367 );
368 self.location
369 .flow_state()
370 .borrow_mut()
371 .push_root(HydroRoot::CycleSink {
372 cycle_id,
373 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
374 op_metadata: HydroIrOpMetadata::new(),
375 });
376 }
377}
378
379impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
380where
381 T: Clone,
382 L: Location<'a>,
383{
384 fn clone(&self) -> Self {
385 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
386 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
387 *self.ir_node.borrow_mut() = HydroNode::Tee {
388 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
389 metadata: self.location.new_node_metadata(Self::collection_kind()),
390 };
391 }
392
393 let HydroNode::Tee { inner, metadata } = &*self.ir_node.borrow() else {
394 unreachable!()
395 };
396 Stream {
397 location: self.location.clone(),
398 flow_state: self.flow_state.clone(),
399 ir_node: HydroNode::Tee {
400 inner: SharedNode(inner.0.clone()),
401 metadata: metadata.clone(),
402 }
403 .into(),
404 _phantom: PhantomData,
405 }
406 }
407}
408
409impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
410where
411 L: Location<'a>,
412{
413 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
414 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
415 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
416
417 let flow_state = location.flow_state().clone();
418 Stream {
419 location,
420 flow_state,
421 ir_node: RefCell::new(ir_node),
422 _phantom: PhantomData,
423 }
424 }
425
426 /// Returns the [`Location`] where this stream is being materialized.
427 pub fn location(&self) -> &L {
428 &self.location
429 }
430
431 /// Weakens the consistency of this live collection to not guarantee any consistency across
432 /// cluster members (if this collection is on a cluster).
433 pub fn weaken_consistency(self) -> Stream<T, L::DropConsistency, B, O, R>
434 where
435 L: Location<'a>,
436 {
437 if L::consistency()
438 .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
439 {
440 // already no consistency
441 Stream::new(
442 self.location.drop_consistency(),
443 self.ir_node.replace(HydroNode::Placeholder),
444 )
445 } else {
446 Stream::new(
447 self.location.drop_consistency(),
448 HydroNode::Cast {
449 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
450 metadata: self.location.drop_consistency().new_node_metadata(Stream::<
451 T,
452 L::DropConsistency,
453 B,
454 O,
455 R,
456 >::collection_kind(
457 )),
458 },
459 )
460 }
461 }
462
463 /// Casts this live collection to have the consistency guarantees specified in the given
464 /// location type parameter. The developer must ensure that the strengthened consistency
465 /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
466 pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
467 self,
468 _proof: impl crate::properties::ConsistencyProof,
469 ) -> Stream<T, L2, B, O, R>
470 where
471 L: Location<'a>,
472 {
473 if L::consistency() == L2::consistency() {
474 Stream::new(
475 self.location.with_consistency_of(),
476 self.ir_node.replace(HydroNode::Placeholder),
477 )
478 } else {
479 Stream::new(
480 self.location.with_consistency_of(),
481 HydroNode::AssertIsConsistent {
482 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
483 trusted: false,
484 metadata: self
485 .location
486 .clone()
487 .with_consistency_of::<L2>()
488 .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
489 },
490 )
491 }
492 }
493
494 pub(crate) fn assert_has_consistency_of_trusted<
495 L2: Location<'a, DropConsistency = L::DropConsistency>,
496 >(
497 self,
498 _proof: impl crate::properties::ConsistencyProof,
499 ) -> Stream<T, L2, B, O, R>
500 where
501 L: Location<'a>,
502 {
503 if L::consistency() == L2::consistency() {
504 Stream::new(
505 self.location.with_consistency_of(),
506 self.ir_node.replace(HydroNode::Placeholder),
507 )
508 } else {
509 Stream::new(
510 self.location.with_consistency_of(),
511 HydroNode::AssertIsConsistent {
512 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
513 trusted: true,
514 metadata: self
515 .location
516 .clone()
517 .with_consistency_of::<L2>()
518 .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
519 },
520 )
521 }
522 }
523
524 pub(crate) fn collection_kind() -> CollectionKind {
525 CollectionKind::Stream {
526 bound: B::BOUND_KIND,
527 order: O::ORDERING_KIND,
528 retry: R::RETRIES_KIND,
529 element_type: quote_type::<T>().into(),
530 }
531 }
532
533 /// Produces a stream based on invoking `f` on each element.
534 /// If you do not want to modify the stream and instead only want to view
535 /// each item use [`Stream::inspect`] instead.
536 ///
537 /// # Example
538 /// ```rust
539 /// # #[cfg(feature = "deploy")] {
540 /// # use hydro_lang::prelude::*;
541 /// # use futures::StreamExt;
542 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
543 /// let words = process.source_iter(q!(vec!["hello", "world"]));
544 /// words.map(q!(|x| x.to_uppercase()))
545 /// # }, |mut stream| async move {
546 /// # for w in vec!["HELLO", "WORLD"] {
547 /// # assert_eq!(stream.next().await.unwrap(), w);
548 /// # }
549 /// # }));
550 /// # }
551 /// ```
552 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
553 where
554 F: Fn(T) -> U + 'a,
555 {
556 let f = crate::singleton_ref::with_singleton_capture(|| {
557 f.splice_fn1_ctx(&self.location).into()
558 });
559 Stream::new(
560 self.location.clone(),
561 HydroNode::Map {
562 f,
563 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
564 metadata: self
565 .location
566 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
567 },
568 )
569 }
570
571 /// For each item `i` in the input stream, transform `i` using `f` and then treat the
572 /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
573 /// for the output type `U` must produce items in a **deterministic** order.
574 ///
575 /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
576 /// not deterministic, use [`Stream::flat_map_unordered`] instead.
577 ///
578 /// # Example
579 /// ```rust
580 /// # #[cfg(feature = "deploy")] {
581 /// # use hydro_lang::prelude::*;
582 /// # use futures::StreamExt;
583 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
584 /// process
585 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
586 /// .flat_map_ordered(q!(|x| x))
587 /// # }, |mut stream| async move {
588 /// // 1, 2, 3, 4
589 /// # for w in (1..5) {
590 /// # assert_eq!(stream.next().await.unwrap(), w);
591 /// # }
592 /// # }));
593 /// # }
594 /// ```
595 pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
596 where
597 I: IntoIterator<Item = U>,
598 F: Fn(T) -> I + 'a,
599 {
600 let f = crate::singleton_ref::with_singleton_capture(|| {
601 f.splice_fn1_ctx(&self.location).into()
602 });
603 Stream::new(
604 self.location.clone(),
605 HydroNode::FlatMap {
606 f,
607 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
608 metadata: self
609 .location
610 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
611 },
612 )
613 }
614
615 /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
616 /// for the output type `U` to produce items in any order.
617 ///
618 /// # Example
619 /// ```rust
620 /// # #[cfg(feature = "deploy")] {
621 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
622 /// # use futures::StreamExt;
623 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
624 /// process
625 /// .source_iter(q!(vec![
626 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
627 /// std::collections::HashSet::from_iter(vec![3, 4]),
628 /// ]))
629 /// .flat_map_unordered(q!(|x| x))
630 /// # }, |mut stream| async move {
631 /// // 1, 2, 3, 4, but in no particular order
632 /// # let mut results = Vec::new();
633 /// # for w in (1..5) {
634 /// # results.push(stream.next().await.unwrap());
635 /// # }
636 /// # results.sort();
637 /// # assert_eq!(results, vec![1, 2, 3, 4]);
638 /// # }));
639 /// # }
640 /// ```
641 pub fn flat_map_unordered<U, I, F>(
642 self,
643 f: impl IntoQuotedMut<'a, F, L>,
644 ) -> Stream<U, L, B, NoOrder, R>
645 where
646 I: IntoIterator<Item = U>,
647 F: Fn(T) -> I + 'a,
648 {
649 let f = crate::singleton_ref::with_singleton_capture(|| {
650 f.splice_fn1_ctx(&self.location).into()
651 });
652 Stream::new(
653 self.location.clone(),
654 HydroNode::FlatMap {
655 f,
656 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
657 metadata: self
658 .location
659 .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
660 },
661 )
662 }
663
664 /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
665 /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
666 ///
667 /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
668 /// not deterministic, use [`Stream::flatten_unordered`] instead.
669 ///
670 /// ```rust
671 /// # #[cfg(feature = "deploy")] {
672 /// # use hydro_lang::prelude::*;
673 /// # use futures::StreamExt;
674 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
675 /// process
676 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
677 /// .flatten_ordered()
678 /// # }, |mut stream| async move {
679 /// // 1, 2, 3, 4
680 /// # for w in (1..5) {
681 /// # assert_eq!(stream.next().await.unwrap(), w);
682 /// # }
683 /// # }));
684 /// # }
685 /// ```
686 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
687 where
688 T: IntoIterator<Item = U>,
689 {
690 self.flat_map_ordered(q!(|d| d))
691 }
692
693 /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
694 /// for the element type `T` to produce items in any order.
695 ///
696 /// # Example
697 /// ```rust
698 /// # #[cfg(feature = "deploy")] {
699 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
700 /// # use futures::StreamExt;
701 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
702 /// process
703 /// .source_iter(q!(vec![
704 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
705 /// std::collections::HashSet::from_iter(vec![3, 4]),
706 /// ]))
707 /// .flatten_unordered()
708 /// # }, |mut stream| async move {
709 /// // 1, 2, 3, 4, but in no particular order
710 /// # let mut results = Vec::new();
711 /// # for w in (1..5) {
712 /// # results.push(stream.next().await.unwrap());
713 /// # }
714 /// # results.sort();
715 /// # assert_eq!(results, vec![1, 2, 3, 4]);
716 /// # }));
717 /// # }
718 /// ```
719 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
720 where
721 T: IntoIterator<Item = U>,
722 {
723 self.flat_map_unordered(q!(|d| d))
724 }
725
726 /// For each item in the input stream, apply `f` to produce a [`futures::stream::Stream`],
727 /// then emit the elements of that stream one by one. When the inner stream yields
728 /// `Pending`, this operator yields as well.
729 pub fn flat_map_stream_blocking<U, S, F>(
730 self,
731 f: impl IntoQuotedMut<'a, F, L>,
732 ) -> Stream<U, L, B, O, R>
733 where
734 S: futures::Stream<Item = U>,
735 F: Fn(T) -> S + 'a,
736 {
737 let f = f.splice_fn1_ctx(&self.location).into();
738 Stream::new(
739 self.location.clone(),
740 HydroNode::FlatMapStreamBlocking {
741 f,
742 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
743 metadata: self
744 .location
745 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
746 },
747 )
748 }
749
750 /// For each item in the input stream, treat it as a [`futures::stream::Stream`] and
751 /// emit its elements one by one. When the inner stream yields `Pending`, this operator
752 /// yields as well.
753 pub fn flatten_stream_blocking<U>(self) -> Stream<U, L, B, O, R>
754 where
755 T: futures::Stream<Item = U>,
756 {
757 self.flat_map_stream_blocking(q!(|d| d))
758 }
759
760 /// Creates a stream containing only the elements of the input stream that satisfy a predicate
761 /// `f`, preserving the order of the elements.
762 ///
763 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
764 /// not modify or take ownership of the values. If you need to modify the values while filtering
765 /// use [`Stream::filter_map`] instead.
766 ///
767 /// # Example
768 /// ```rust
769 /// # #[cfg(feature = "deploy")] {
770 /// # use hydro_lang::prelude::*;
771 /// # use futures::StreamExt;
772 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
773 /// process
774 /// .source_iter(q!(vec![1, 2, 3, 4]))
775 /// .filter(q!(|&x| x > 2))
776 /// # }, |mut stream| async move {
777 /// // 3, 4
778 /// # for w in (3..5) {
779 /// # assert_eq!(stream.next().await.unwrap(), w);
780 /// # }
781 /// # }));
782 /// # }
783 /// ```
784 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
785 where
786 F: Fn(&T) -> bool + 'a,
787 {
788 let f = crate::singleton_ref::with_singleton_capture(|| {
789 f.splice_fn1_borrow_ctx(&self.location).into()
790 });
791 Stream::new(
792 self.location.clone(),
793 HydroNode::Filter {
794 f,
795 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
796 metadata: self.location.new_node_metadata(Self::collection_kind()),
797 },
798 )
799 }
800
801 /// Splits the stream into two streams based on a predicate, without cloning elements.
802 ///
803 /// Elements for which `f` returns `true` are sent to the first output stream,
804 /// and elements for which `f` returns `false` are sent to the second output stream.
805 ///
806 /// Unlike using `filter` twice, this only evaluates the predicate once per element
807 /// and does not require `T: Clone`.
808 ///
809 /// The closure `f` receives a reference `&T` rather than an owned value `T` because
810 /// the predicate is only used for routing; the element itself is moved to the
811 /// appropriate output stream.
812 ///
813 /// # Example
814 /// ```rust
815 /// # #[cfg(feature = "deploy")] {
816 /// # use hydro_lang::prelude::*;
817 /// # use hydro_lang::live_collections::stream::{NoOrder, ExactlyOnce};
818 /// # use futures::StreamExt;
819 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
820 /// let numbers: Stream<_, _, Unbounded> = process.source_iter(q!(vec![1, 2, 3, 4, 5, 6])).into();
821 /// let (evens, odds) = numbers.partition(q!(|&x| x % 2 == 0));
822 /// // evens: 2, 4, 6 tagged with true; odds: 1, 3, 5 tagged with false
823 /// evens.map(q!(|x| (x, true)))
824 /// .merge_unordered(odds.map(q!(|x| (x, false))))
825 /// # }, |mut stream| async move {
826 /// # let mut results = Vec::new();
827 /// # for _ in 0..6 {
828 /// # results.push(stream.next().await.unwrap());
829 /// # }
830 /// # results.sort();
831 /// # assert_eq!(results, vec![(1, false), (2, true), (3, false), (4, true), (5, false), (6, true)]);
832 /// # }));
833 /// # }
834 /// ```
835 #[expect(
836 clippy::type_complexity,
837 reason = "return type mirrors the input stream type"
838 )]
839 pub fn partition<F>(
840 self,
841 f: impl IntoQuotedMut<'a, F, L>,
842 ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
843 where
844 F: Fn(&T) -> bool + 'a,
845 {
846 let f = crate::singleton_ref::with_singleton_capture(|| {
847 f.splice_fn1_borrow_ctx(&self.location).into()
848 });
849 let shared = SharedNode(Rc::new(RefCell::new(
850 self.ir_node.replace(HydroNode::Placeholder),
851 )));
852
853 let true_stream = Stream::new(
854 self.location.clone(),
855 HydroNode::Partition {
856 inner: SharedNode(shared.0.clone()),
857 f: f.clone(),
858 is_true: true,
859 metadata: self.location.new_node_metadata(Self::collection_kind()),
860 },
861 );
862
863 let false_stream = Stream::new(
864 self.location.clone(),
865 HydroNode::Partition {
866 inner: SharedNode(shared.0),
867 f,
868 is_true: false,
869 metadata: self.location.new_node_metadata(Self::collection_kind()),
870 },
871 );
872
873 (true_stream, false_stream)
874 }
875
876 /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
877 ///
878 /// # Example
879 /// ```rust
880 /// # #[cfg(feature = "deploy")] {
881 /// # use hydro_lang::prelude::*;
882 /// # use futures::StreamExt;
883 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
884 /// process
885 /// .source_iter(q!(vec!["1", "hello", "world", "2"]))
886 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
887 /// # }, |mut stream| async move {
888 /// // 1, 2
889 /// # for w in (1..3) {
890 /// # assert_eq!(stream.next().await.unwrap(), w);
891 /// # }
892 /// # }));
893 /// # }
894 /// ```
895 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
896 where
897 F: Fn(T) -> Option<U> + 'a,
898 {
899 let f = f.splice_fn1_ctx(&self.location).into();
900 Stream::new(
901 self.location.clone(),
902 HydroNode::FilterMap {
903 f,
904 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
905 metadata: self
906 .location
907 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
908 },
909 )
910 }
911
912 /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
913 /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
914 /// If `other` is an empty [`Optional`], no values will be produced.
915 ///
916 /// # Example
917 /// ```rust
918 /// # #[cfg(feature = "deploy")] {
919 /// # use hydro_lang::prelude::*;
920 /// # use futures::StreamExt;
921 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
922 /// let tick = process.tick();
923 /// let batch = process
924 /// .source_iter(q!(vec![1, 2, 3, 4]))
925 /// .batch(&tick, nondet!(/** test */));
926 /// let count = batch.clone().count(); // `count()` returns a singleton
927 /// batch.cross_singleton(count).all_ticks()
928 /// # }, |mut stream| async move {
929 /// // (1, 4), (2, 4), (3, 4), (4, 4)
930 /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
931 /// # assert_eq!(stream.next().await.unwrap(), w);
932 /// # }
933 /// # }));
934 /// # }
935 /// ```
936 pub fn cross_singleton<O2>(
937 self,
938 other: impl Into<Optional<O2, L, Bounded>>,
939 ) -> Stream<(T, O2), L, B, O, R>
940 where
941 O2: Clone,
942 {
943 let other: Optional<O2, L, Bounded> = other.into();
944 check_matching_location(&self.location, &other.location);
945
946 Stream::new(
947 self.location.clone(),
948 HydroNode::CrossSingleton {
949 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
950 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
951 metadata: self
952 .location
953 .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
954 },
955 )
956 }
957
958 /// Passes this stream through if the boolean signal is `true`, otherwise the output is empty.
959 ///
960 /// # Example
961 /// ```rust
962 /// # #[cfg(feature = "deploy")] {
963 /// # use hydro_lang::prelude::*;
964 /// # use futures::StreamExt;
965 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
966 /// let tick = process.tick();
967 /// // ticks are lazy by default, forces the second tick to run
968 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
969 ///
970 /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
971 /// let batch_first_tick = process
972 /// .source_iter(q!(vec![1, 2, 3, 4]))
973 /// .batch(&tick, nondet!(/** test */));
974 /// let batch_second_tick = process
975 /// .source_iter(q!(vec![5, 6, 7, 8]))
976 /// .batch(&tick, nondet!(/** test */))
977 /// .defer_tick();
978 /// batch_first_tick.chain(batch_second_tick)
979 /// .filter_if(signal)
980 /// .all_ticks()
981 /// # }, |mut stream| async move {
982 /// // [1, 2, 3, 4]
983 /// # for w in vec![1, 2, 3, 4] {
984 /// # assert_eq!(stream.next().await.unwrap(), w);
985 /// # }
986 /// # }));
987 /// # }
988 /// ```
989 pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
990 self.cross_singleton(signal.filter(q!(|b| *b)))
991 .map(q!(|(d, _)| d))
992 }
993
994 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
995 ///
996 /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
997 /// leader of a cluster.
998 ///
999 /// # Example
1000 /// ```rust
1001 /// # #[cfg(feature = "deploy")] {
1002 /// # use hydro_lang::prelude::*;
1003 /// # use futures::StreamExt;
1004 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1005 /// let tick = process.tick();
1006 /// // ticks are lazy by default, forces the second tick to run
1007 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1008 ///
1009 /// let batch_first_tick = process
1010 /// .source_iter(q!(vec![1, 2, 3, 4]))
1011 /// .batch(&tick, nondet!(/** test */));
1012 /// let batch_second_tick = process
1013 /// .source_iter(q!(vec![5, 6, 7, 8]))
1014 /// .batch(&tick, nondet!(/** test */))
1015 /// .defer_tick(); // appears on the second tick
1016 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1017 /// batch_first_tick.chain(batch_second_tick)
1018 /// .filter_if_some(some_on_first_tick)
1019 /// .all_ticks()
1020 /// # }, |mut stream| async move {
1021 /// // [1, 2, 3, 4]
1022 /// # for w in vec![1, 2, 3, 4] {
1023 /// # assert_eq!(stream.next().await.unwrap(), w);
1024 /// # }
1025 /// # }));
1026 /// # }
1027 /// ```
1028 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1029 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1030 self.filter_if(signal.is_some())
1031 }
1032
1033 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
1034 ///
1035 /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
1036 /// some local state.
1037 ///
1038 /// # Example
1039 /// ```rust
1040 /// # #[cfg(feature = "deploy")] {
1041 /// # use hydro_lang::prelude::*;
1042 /// # use futures::StreamExt;
1043 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1044 /// let tick = process.tick();
1045 /// // ticks are lazy by default, forces the second tick to run
1046 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1047 ///
1048 /// let batch_first_tick = process
1049 /// .source_iter(q!(vec![1, 2, 3, 4]))
1050 /// .batch(&tick, nondet!(/** test */));
1051 /// let batch_second_tick = process
1052 /// .source_iter(q!(vec![5, 6, 7, 8]))
1053 /// .batch(&tick, nondet!(/** test */))
1054 /// .defer_tick(); // appears on the second tick
1055 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1056 /// batch_first_tick.chain(batch_second_tick)
1057 /// .filter_if_none(some_on_first_tick)
1058 /// .all_ticks()
1059 /// # }, |mut stream| async move {
1060 /// // [5, 6, 7, 8]
1061 /// # for w in vec![5, 6, 7, 8] {
1062 /// # assert_eq!(stream.next().await.unwrap(), w);
1063 /// # }
1064 /// # }));
1065 /// # }
1066 /// ```
1067 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1068 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1069 self.filter_if(other.is_none())
1070 }
1071
1072 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams,
1073 /// returning all tupled pairs.
1074 ///
1075 /// When the right side is [`Bounded`], it is accumulated first and the left side streams
1076 /// through, preserving the left side's ordering. When both sides are [`Unbounded`], a
1077 /// symmetric hash join is used and ordering is [`NoOrder`].
1078 ///
1079 /// # Example
1080 /// ```rust
1081 /// # #[cfg(feature = "deploy")] {
1082 /// # use hydro_lang::prelude::*;
1083 /// # use std::collections::HashSet;
1084 /// # use futures::StreamExt;
1085 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1086 /// let tick = process.tick();
1087 /// let stream1 = process.source_iter(q!(vec![1, 2]));
1088 /// let stream2 = process.source_iter(q!(vec!['a', 'b']));
1089 /// stream1.cross_product(stream2)
1090 /// # }, |mut stream| async move {
1091 /// // (1, 'a'), (1, 'b'), (2, 'a'), (2, 'b') in any order
1092 /// # let expected = HashSet::from([(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]);
1093 /// # stream.map(|i| assert!(expected.contains(&i)));
1094 /// # }));
1095 /// # }
1096 pub fn cross_product<T2, B2: Boundedness, O2: Ordering>(
1097 self,
1098 other: Stream<T2, L, B2, O2, R>,
1099 ) -> Stream<(T, T2), L, B, B2::PreserveOrderIfBounded<O>, R>
1100 where
1101 T: Clone,
1102 T2: Clone,
1103 {
1104 self.map(q!(|v| ((), v)))
1105 .join(other.map(q!(|v| ((), v))))
1106 .map(q!(|((), (v1, v2))| (v1, v2)))
1107 }
1108
1109 /// Takes one stream as input and filters out any duplicate occurrences. The output
1110 /// contains all unique values from the input.
1111 ///
1112 /// # Example
1113 /// ```rust
1114 /// # #[cfg(feature = "deploy")] {
1115 /// # use hydro_lang::prelude::*;
1116 /// # use futures::StreamExt;
1117 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1118 /// let tick = process.tick();
1119 /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
1120 /// # }, |mut stream| async move {
1121 /// # for w in vec![1, 2, 3, 4] {
1122 /// # assert_eq!(stream.next().await.unwrap(), w);
1123 /// # }
1124 /// # }));
1125 /// # }
1126 /// ```
1127 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
1128 where
1129 T: Eq + Hash,
1130 {
1131 Stream::new(
1132 self.location.clone(),
1133 HydroNode::Unique {
1134 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1135 metadata: self
1136 .location
1137 .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1138 },
1139 )
1140 }
1141
1142 /// Outputs everything in this stream that is *not* contained in the `other` stream.
1143 ///
1144 /// The `other` stream must be [`Bounded`], since this function will wait until
1145 /// all its elements are available before producing any output.
1146 /// # Example
1147 /// ```rust
1148 /// # #[cfg(feature = "deploy")] {
1149 /// # use hydro_lang::prelude::*;
1150 /// # use futures::StreamExt;
1151 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1152 /// let tick = process.tick();
1153 /// let stream = process
1154 /// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
1155 /// .batch(&tick, nondet!(/** test */));
1156 /// let batch = process
1157 /// .source_iter(q!(vec![1, 2]))
1158 /// .batch(&tick, nondet!(/** test */));
1159 /// stream.filter_not_in(batch).all_ticks()
1160 /// # }, |mut stream| async move {
1161 /// # for w in vec![3, 4] {
1162 /// # assert_eq!(stream.next().await.unwrap(), w);
1163 /// # }
1164 /// # }));
1165 /// # }
1166 /// ```
1167 pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1168 where
1169 T: Eq + Hash,
1170 B2: IsBounded,
1171 {
1172 check_matching_location(&self.location, &other.location);
1173
1174 Stream::new(
1175 self.location.clone(),
1176 HydroNode::Difference {
1177 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1178 neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1179 metadata: self
1180 .location
1181 .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1182 },
1183 )
1184 }
1185
1186 /// An operator which allows you to "inspect" each element of a stream without
1187 /// modifying it. The closure `f` is called on a reference to each item. This is
1188 /// mainly useful for debugging, and should not be used to generate side-effects.
1189 ///
1190 /// # Example
1191 /// ```rust
1192 /// # #[cfg(feature = "deploy")] {
1193 /// # use hydro_lang::prelude::*;
1194 /// # use futures::StreamExt;
1195 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1196 /// let nums = process.source_iter(q!(vec![1, 2]));
1197 /// // prints "1 * 10 = 10" and "2 * 10 = 20"
1198 /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
1199 /// # }, |mut stream| async move {
1200 /// # for w in vec![1, 2] {
1201 /// # assert_eq!(stream.next().await.unwrap(), w);
1202 /// # }
1203 /// # }));
1204 /// # }
1205 /// ```
1206 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L::DropConsistency>) -> Self
1207 where
1208 F: Fn(&T) + 'a,
1209 {
1210 let f = crate::singleton_ref::with_singleton_capture(|| {
1211 f.splice_fn1_borrow_ctx(&self.location.drop_consistency())
1212 .into()
1213 });
1214
1215 Stream::new(
1216 self.location.clone(),
1217 HydroNode::Inspect {
1218 f,
1219 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1220 metadata: self.location.new_node_metadata(Self::collection_kind()),
1221 },
1222 )
1223 }
1224
1225 /// Executes the provided closure for every element in this stream.
1226 ///
1227 /// Because the closure may have side effects, the stream must have deterministic order
1228 /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
1229 /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
1230 /// [`Stream::assume_retries`] with an explanation for why this is the case.
1231 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
1232 where
1233 O: IsOrdered,
1234 R: IsExactlyOnce,
1235 {
1236 let f = f.splice_fn1_ctx(&self.location).into();
1237 self.location
1238 .flow_state()
1239 .borrow_mut()
1240 .push_root(HydroRoot::ForEach {
1241 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1242 f,
1243 op_metadata: HydroIrOpMetadata::new(),
1244 });
1245 }
1246
1247 /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
1248 /// TCP socket to some other server. You should _not_ use this API for interacting with
1249 /// external clients, instead see [`Location::bidi_external_many_bytes`] and
1250 /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
1251 /// interaction with asynchronous sinks.
1252 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1253 where
1254 O: IsOrdered,
1255 R: IsExactlyOnce,
1256 S: 'a + futures::Sink<T> + Unpin,
1257 {
1258 self.location
1259 .flow_state()
1260 .borrow_mut()
1261 .push_root(HydroRoot::DestSink {
1262 sink: sink.splice_typed_ctx(&self.location).into(),
1263 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1264 op_metadata: HydroIrOpMetadata::new(),
1265 });
1266 }
1267
1268 /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1269 ///
1270 /// # Example
1271 /// ```rust
1272 /// # #[cfg(feature = "deploy")] {
1273 /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1274 /// # use futures::StreamExt;
1275 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1276 /// let tick = process.tick();
1277 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1278 /// numbers.enumerate()
1279 /// # }, |mut stream| async move {
1280 /// // (0, 1), (1, 2), (2, 3), (3, 4)
1281 /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1282 /// # assert_eq!(stream.next().await.unwrap(), w);
1283 /// # }
1284 /// # }));
1285 /// # }
1286 /// ```
1287 pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1288 where
1289 O: IsOrdered,
1290 R: IsExactlyOnce,
1291 {
1292 Stream::new(
1293 self.location.clone(),
1294 HydroNode::Enumerate {
1295 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1296 metadata: self.location.new_node_metadata(Stream::<
1297 (usize, T),
1298 L,
1299 B,
1300 TotalOrder,
1301 ExactlyOnce,
1302 >::collection_kind()),
1303 },
1304 )
1305 }
1306
1307 /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1308 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1309 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1310 ///
1311 /// Depending on the input stream guarantees, the closure may need to be commutative
1312 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1313 ///
1314 /// # Example
1315 /// ```rust
1316 /// # #[cfg(feature = "deploy")] {
1317 /// # use hydro_lang::prelude::*;
1318 /// # use futures::StreamExt;
1319 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1320 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1321 /// words
1322 /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1323 /// .into_stream()
1324 /// # }, |mut stream| async move {
1325 /// // "HELLOWORLD"
1326 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1327 /// # }));
1328 /// # }
1329 /// ```
1330 pub fn fold<A, I, F, C, Idemp, M, B2: SingletonBound>(
1331 self,
1332 init: impl IntoQuotedMut<'a, I, L>,
1333 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1334 ) -> Singleton<A, L, B2>
1335 where
1336 I: Fn() -> A + 'a,
1337 F: Fn(&mut A, T),
1338 C: ValidCommutativityFor<O>,
1339 Idemp: ValidIdempotenceFor<R>,
1340 B: ApplyMonotoneStream<M, B2>,
1341 {
1342 let init = init.splice_fn0_ctx(&self.location).into();
1343 let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1344 proof.register_proof(&comb);
1345
1346 // Only assume_retries (for idempotence), not assume_ordering.
1347 // The fold hook in the simulator handles ordering non-determinism directly.
1348 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1349 let retried: Stream<T, L::DropConsistency, B, O, ExactlyOnce> = self.assume_retries(nondet);
1350
1351 let core = HydroNode::Fold {
1352 init,
1353 acc: comb.into(),
1354 input: Box::new(retried.ir_node.replace(HydroNode::Placeholder)),
1355 metadata: retried
1356 .location
1357 .new_node_metadata(Singleton::<A, L::DropConsistency, B2>::collection_kind()),
1358 // we do not guarantee consistency at this point because if the algebraic properties
1359 // do not hold in practice, replica consistency may fail to be maintained, so we
1360 // would like the simulator to assert consistency; in the future, this will be dynamic
1361 // based on the proof mechanism
1362 };
1363
1364 Singleton::new(retried.location.clone(), core)
1365 .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
1366 }
1367
1368 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1369 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1370 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1371 /// reference, so that it can be modified in place.
1372 ///
1373 /// Depending on the input stream guarantees, the closure may need to be commutative
1374 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1375 ///
1376 /// # Example
1377 /// ```rust
1378 /// # #[cfg(feature = "deploy")] {
1379 /// # use hydro_lang::prelude::*;
1380 /// # use futures::StreamExt;
1381 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1382 /// let bools = process.source_iter(q!(vec![false, true, false]));
1383 /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1384 /// # }, |mut stream| async move {
1385 /// // true
1386 /// # assert_eq!(stream.next().await.unwrap(), true);
1387 /// # }));
1388 /// # }
1389 /// ```
1390 pub fn reduce<F, C, Idemp>(
1391 self,
1392 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1393 ) -> Optional<T, L, B>
1394 where
1395 F: Fn(&mut T, T) + 'a,
1396 C: ValidCommutativityFor<O>,
1397 Idemp: ValidIdempotenceFor<R>,
1398 {
1399 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1400 proof.register_proof(&f);
1401
1402 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1403 let ordered_etc: Stream<T, L::DropConsistency, B> =
1404 self.assume_retries(nondet).assume_ordering(nondet);
1405
1406 let core = HydroNode::Reduce {
1407 f: f.into(),
1408 input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1409 metadata: ordered_etc
1410 .location
1411 .new_node_metadata(Optional::<T, L::DropConsistency, B>::collection_kind()),
1412 };
1413
1414 Optional::new(ordered_etc.location.clone(), core)
1415 .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
1416 }
1417
1418 /// Computes the maximum element in the stream as an [`Optional`], which
1419 /// will be empty until the first element in the input arrives.
1420 ///
1421 /// # Example
1422 /// ```rust
1423 /// # #[cfg(feature = "deploy")] {
1424 /// # use hydro_lang::prelude::*;
1425 /// # use futures::StreamExt;
1426 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1427 /// let tick = process.tick();
1428 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1429 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1430 /// batch.max().all_ticks()
1431 /// # }, |mut stream| async move {
1432 /// // 4
1433 /// # assert_eq!(stream.next().await.unwrap(), 4);
1434 /// # }));
1435 /// # }
1436 /// ```
1437 pub fn max(self) -> Optional<T, L, B>
1438 where
1439 T: Ord,
1440 {
1441 self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1442 .assume_ordering_trusted_bounded::<TotalOrder>(
1443 nondet!(/** max is commutative, but order affects intermediates */),
1444 )
1445 .reduce(q!(|curr, new| {
1446 if new > *curr {
1447 *curr = new;
1448 }
1449 }))
1450 }
1451
1452 /// Computes the minimum element in the stream as an [`Optional`], which
1453 /// will be empty until the first element in the input arrives.
1454 ///
1455 /// # Example
1456 /// ```rust
1457 /// # #[cfg(feature = "deploy")] {
1458 /// # use hydro_lang::prelude::*;
1459 /// # use futures::StreamExt;
1460 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1461 /// let tick = process.tick();
1462 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1463 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1464 /// batch.min().all_ticks()
1465 /// # }, |mut stream| async move {
1466 /// // 1
1467 /// # assert_eq!(stream.next().await.unwrap(), 1);
1468 /// # }));
1469 /// # }
1470 /// ```
1471 pub fn min(self) -> Optional<T, L, B>
1472 where
1473 T: Ord,
1474 {
1475 self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1476 .assume_ordering_trusted_bounded::<TotalOrder>(
1477 nondet!(/** max is commutative, but order affects intermediates */),
1478 )
1479 .reduce(q!(|curr, new| {
1480 if new < *curr {
1481 *curr = new;
1482 }
1483 }))
1484 }
1485
1486 /// Computes the first element in the stream as an [`Optional`], which
1487 /// will be empty until the first element in the input arrives.
1488 ///
1489 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1490 /// re-ordering of elements may cause the first element to change.
1491 ///
1492 /// # Example
1493 /// ```rust
1494 /// # #[cfg(feature = "deploy")] {
1495 /// # use hydro_lang::prelude::*;
1496 /// # use futures::StreamExt;
1497 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1498 /// let tick = process.tick();
1499 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1500 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1501 /// batch.first().all_ticks()
1502 /// # }, |mut stream| async move {
1503 /// // 1
1504 /// # assert_eq!(stream.next().await.unwrap(), 1);
1505 /// # }));
1506 /// # }
1507 /// ```
1508 pub fn first(self) -> Optional<T, L, B>
1509 where
1510 O: IsOrdered,
1511 {
1512 self.make_totally_ordered()
1513 .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1514 .generator(q!(|| ()), q!(|_, item| Generate::Return(item)))
1515 .reduce(q!(|_, _| {}))
1516 }
1517
1518 /// Computes the last element in the stream as an [`Optional`], which
1519 /// will be empty until an element in the input arrives.
1520 ///
1521 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1522 /// re-ordering of elements may cause the last element to change.
1523 ///
1524 /// # Example
1525 /// ```rust
1526 /// # #[cfg(feature = "deploy")] {
1527 /// # use hydro_lang::prelude::*;
1528 /// # use futures::StreamExt;
1529 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1530 /// let tick = process.tick();
1531 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1532 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1533 /// batch.last().all_ticks()
1534 /// # }, |mut stream| async move {
1535 /// // 4
1536 /// # assert_eq!(stream.next().await.unwrap(), 4);
1537 /// # }));
1538 /// # }
1539 /// ```
1540 pub fn last(self) -> Optional<T, L, B>
1541 where
1542 O: IsOrdered,
1543 {
1544 self.make_totally_ordered()
1545 .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1546 .reduce(q!(|curr, new| *curr = new))
1547 }
1548
1549 /// Returns a stream containing at most the first `n` elements of the input stream,
1550 /// preserving the original order. Similar to `LIMIT` in SQL.
1551 ///
1552 /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1553 /// retries, since the result depends on the order and cardinality of elements.
1554 ///
1555 /// # Example
1556 /// ```rust
1557 /// # #[cfg(feature = "deploy")] {
1558 /// # use hydro_lang::prelude::*;
1559 /// # use futures::StreamExt;
1560 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1561 /// let numbers = process.source_iter(q!(vec![10, 20, 30, 40, 50]));
1562 /// numbers.limit(q!(3))
1563 /// # }, |mut stream| async move {
1564 /// // 10, 20, 30
1565 /// # for w in vec![10, 20, 30] {
1566 /// # assert_eq!(stream.next().await.unwrap(), w);
1567 /// # }
1568 /// # }));
1569 /// # }
1570 /// ```
1571 pub fn limit(
1572 self,
1573 n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1574 ) -> Stream<T, L, B, TotalOrder, ExactlyOnce>
1575 where
1576 O: IsOrdered,
1577 R: IsExactlyOnce,
1578 {
1579 self.generator(
1580 q!(|| 0usize),
1581 q!(move |count, item| {
1582 if *count == n {
1583 Generate::Break
1584 } else {
1585 *count += 1;
1586 if *count == n {
1587 Generate::Return(item)
1588 } else {
1589 Generate::Yield(item)
1590 }
1591 }
1592 }),
1593 )
1594 }
1595
1596 /// Collects all the elements of this stream into a single [`Vec`] element.
1597 ///
1598 /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1599 /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1600 /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1601 /// the vector at an arbitrary point in time.
1602 ///
1603 /// # Example
1604 /// ```rust
1605 /// # #[cfg(feature = "deploy")] {
1606 /// # use hydro_lang::prelude::*;
1607 /// # use futures::StreamExt;
1608 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1609 /// let tick = process.tick();
1610 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1611 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1612 /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1613 /// # }, |mut stream| async move {
1614 /// // [ vec![1, 2, 3, 4] ]
1615 /// # for w in vec![vec![1, 2, 3, 4]] {
1616 /// # assert_eq!(stream.next().await.unwrap(), w);
1617 /// # }
1618 /// # }));
1619 /// # }
1620 /// ```
1621 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1622 where
1623 O: IsOrdered,
1624 R: IsExactlyOnce,
1625 {
1626 self.make_totally_ordered().make_exactly_once().fold(
1627 q!(|| vec![]),
1628 q!(|acc, v| {
1629 acc.push(v);
1630 }),
1631 )
1632 }
1633
1634 /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1635 /// and emitting each intermediate result.
1636 ///
1637 /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1638 /// containing all intermediate accumulated values. The scan operation can also terminate early
1639 /// by returning `None`.
1640 ///
1641 /// The function takes a mutable reference to the accumulator and the current element, and returns
1642 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1643 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1644 ///
1645 /// # Examples
1646 ///
1647 /// Basic usage - running sum:
1648 /// ```rust
1649 /// # #[cfg(feature = "deploy")] {
1650 /// # use hydro_lang::prelude::*;
1651 /// # use futures::StreamExt;
1652 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1653 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1654 /// q!(|| 0),
1655 /// q!(|acc, x| {
1656 /// *acc += x;
1657 /// Some(*acc)
1658 /// }),
1659 /// )
1660 /// # }, |mut stream| async move {
1661 /// // Output: 1, 3, 6, 10
1662 /// # for w in vec![1, 3, 6, 10] {
1663 /// # assert_eq!(stream.next().await.unwrap(), w);
1664 /// # }
1665 /// # }));
1666 /// # }
1667 /// ```
1668 ///
1669 /// Early termination example:
1670 /// ```rust
1671 /// # #[cfg(feature = "deploy")] {
1672 /// # use hydro_lang::prelude::*;
1673 /// # use futures::StreamExt;
1674 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1675 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1676 /// q!(|| 1),
1677 /// q!(|state, x| {
1678 /// *state = *state * x;
1679 /// if *state > 6 {
1680 /// None // Terminate the stream
1681 /// } else {
1682 /// Some(-*state)
1683 /// }
1684 /// }),
1685 /// )
1686 /// # }, |mut stream| async move {
1687 /// // Output: -1, -2, -6
1688 /// # for w in vec![-1, -2, -6] {
1689 /// # assert_eq!(stream.next().await.unwrap(), w);
1690 /// # }
1691 /// # }));
1692 /// # }
1693 /// ```
1694 pub fn scan<A, U, I, F>(
1695 self,
1696 init: impl IntoQuotedMut<'a, I, L>,
1697 f: impl IntoQuotedMut<'a, F, L>,
1698 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1699 where
1700 O: IsOrdered,
1701 R: IsExactlyOnce,
1702 I: Fn() -> A + 'a,
1703 F: Fn(&mut A, T) -> Option<U> + 'a,
1704 {
1705 let init = init.splice_fn0_ctx(&self.location).into();
1706 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1707
1708 Stream::new(
1709 self.location.clone(),
1710 HydroNode::Scan {
1711 init,
1712 acc: f,
1713 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1714 metadata: self.location.new_node_metadata(
1715 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1716 ),
1717 },
1718 )
1719 }
1720
1721 /// Async version of [`Stream::scan`]. Applies an async function to each element of the
1722 /// stream, maintaining an internal state (accumulator) and emitting the values returned
1723 /// by the function.
1724 ///
1725 /// The closure runs synchronously (so it can mutate the accumulator), then returns a
1726 /// future. The future is polled to completion. If it resolves to `Some`, the value is
1727 /// emitted. If it resolves to `None`, the item is filtered out.
1728 ///
1729 /// # Examples
1730 ///
1731 /// ```rust
1732 /// # #[cfg(feature = "deploy")] {
1733 /// # use hydro_lang::prelude::*;
1734 /// # use futures::StreamExt;
1735 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1736 /// process
1737 /// .source_iter(q!(vec![1, 2, 3, 4]))
1738 /// .scan_async_blocking(
1739 /// q!(|| 0),
1740 /// q!(|acc, x| {
1741 /// *acc += x;
1742 /// let val = *acc;
1743 /// async move { Some(val) }
1744 /// }),
1745 /// )
1746 /// # }, |mut stream| async move {
1747 /// // Output: 1, 3, 6, 10
1748 /// # for w in vec![1, 3, 6, 10] {
1749 /// # assert_eq!(stream.next().await.unwrap(), w);
1750 /// # }
1751 /// # }));
1752 /// # }
1753 /// ```
1754 pub fn scan_async_blocking<A, U, I, F, Fut>(
1755 self,
1756 init: impl IntoQuotedMut<'a, I, L>,
1757 f: impl IntoQuotedMut<'a, F, L>,
1758 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1759 where
1760 O: IsOrdered,
1761 R: IsExactlyOnce,
1762 I: Fn() -> A + 'a,
1763 F: Fn(&mut A, T) -> Fut + 'a,
1764 Fut: Future<Output = Option<U>> + 'a,
1765 {
1766 let init = init.splice_fn0_ctx(&self.location).into();
1767 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1768
1769 Stream::new(
1770 self.location.clone(),
1771 HydroNode::ScanAsyncBlocking {
1772 init,
1773 acc: f,
1774 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1775 metadata: self.location.new_node_metadata(
1776 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1777 ),
1778 },
1779 )
1780 }
1781
1782 /// Iteratively processes the elements of the stream using a state machine that can yield
1783 /// elements as it processes its inputs. This is designed to mirror the unstable generator
1784 /// syntax in Rust, without requiring special syntax.
1785 ///
1786 /// Like [`Stream::scan`], this function takes in an initializer that emits the initial
1787 /// state. The second argument defines the processing logic, taking in a mutable reference
1788 /// to the state and the value to be processed. It emits a [`Generate`] value, whose
1789 /// variants define what is emitted and whether further inputs should be processed.
1790 ///
1791 /// # Example
1792 /// ```rust
1793 /// # #[cfg(feature = "deploy")] {
1794 /// # use hydro_lang::prelude::*;
1795 /// # use futures::StreamExt;
1796 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1797 /// process.source_iter(q!(vec![1, 3, 100, 10])).generator(
1798 /// q!(|| 0),
1799 /// q!(|acc, x| {
1800 /// *acc += x;
1801 /// if *acc > 100 {
1802 /// hydro_lang::live_collections::keyed_stream::Generate::Return("done!".to_owned())
1803 /// } else if *acc % 2 == 0 {
1804 /// hydro_lang::live_collections::keyed_stream::Generate::Yield("even".to_owned())
1805 /// } else {
1806 /// hydro_lang::live_collections::keyed_stream::Generate::Continue
1807 /// }
1808 /// }),
1809 /// )
1810 /// # }, |mut stream| async move {
1811 /// // Output: "even", "done!"
1812 /// # let mut results = Vec::new();
1813 /// # for _ in 0..2 {
1814 /// # results.push(stream.next().await.unwrap());
1815 /// # }
1816 /// # results.sort();
1817 /// # assert_eq!(results, vec!["done!".to_owned(), "even".to_owned()]);
1818 /// # }));
1819 /// # }
1820 /// ```
1821 pub fn generator<A, U, I, F>(
1822 self,
1823 init: impl IntoQuotedMut<'a, I, L> + Copy,
1824 f: impl IntoQuotedMut<'a, F, L> + Copy,
1825 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1826 where
1827 O: IsOrdered,
1828 R: IsExactlyOnce,
1829 I: Fn() -> A + 'a,
1830 F: Fn(&mut A, T) -> Generate<U> + 'a,
1831 {
1832 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1833 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1834
1835 let this = self.make_totally_ordered().make_exactly_once();
1836
1837 // State is Option<Option<A>>:
1838 // None = not yet initialized
1839 // Some(Some(a)) = active with state a
1840 // Some(None) = terminated
1841 let scan_init = q!(|| None)
1842 .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1843 .into();
1844 let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1845 if state.is_none() {
1846 *state = Some(Some(init()));
1847 }
1848 match state {
1849 Some(Some(state_value)) => match f(state_value, v) {
1850 Generate::Yield(out) => Some(Some(out)),
1851 Generate::Return(out) => {
1852 *state = Some(None);
1853 Some(Some(out))
1854 }
1855 // Unlike KeyedStream, we can terminate the scan directly on
1856 // Break/Return because there is only one state (no other keys
1857 // that still need processing).
1858 Generate::Break => None,
1859 Generate::Continue => Some(None),
1860 },
1861 // State is Some(None) after Return; terminate the scan.
1862 _ => None,
1863 }
1864 })
1865 .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1866 .into();
1867
1868 let scan_node = HydroNode::Scan {
1869 init: scan_init,
1870 acc: scan_f,
1871 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1872 metadata: this.location.new_node_metadata(Stream::<
1873 Option<U>,
1874 L,
1875 B,
1876 TotalOrder,
1877 ExactlyOnce,
1878 >::collection_kind()),
1879 };
1880
1881 let flatten_f = q!(|d| d)
1882 .splice_fn1_ctx::<Option<U>, _>(&this.location)
1883 .into();
1884 let flatten_node = HydroNode::FlatMap {
1885 f: flatten_f,
1886 input: Box::new(scan_node),
1887 metadata: this
1888 .location
1889 .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1890 };
1891
1892 Stream::new(this.location.clone(), flatten_node)
1893 }
1894
1895 /// Given a time interval, returns a stream corresponding to samples taken from the
1896 /// stream roughly at that interval. The output will have elements in the same order
1897 /// as the input, but with arbitrary elements skipped between samples. There is also
1898 /// no guarantee on the exact timing of the samples.
1899 ///
1900 /// # Non-Determinism
1901 /// The output stream is non-deterministic in which elements are sampled, since this
1902 /// is controlled by a clock.
1903 pub fn sample_every(
1904 self,
1905 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1906 nondet: NonDet,
1907 ) -> Stream<T, L::DropConsistency, Unbounded, O, AtLeastOnce>
1908 where
1909 L: TopLevel<'a>,
1910 {
1911 let samples = self.location.source_interval(interval);
1912
1913 let tick = self.location.tick();
1914 self.batch(&tick, nondet)
1915 .filter_if(samples.batch(&tick, nondet).first().is_some())
1916 .all_ticks()
1917 .weaken_retries()
1918 }
1919
1920 /// Given a timeout duration, returns an [`Optional`] which will have a value if the
1921 /// stream has not emitted a value since that duration.
1922 ///
1923 /// # Non-Determinism
1924 /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1925 /// samples take place, timeouts may be non-deterministically generated or missed,
1926 /// and the notification of the timeout may be delayed as well. There is also no
1927 /// guarantee on how long the [`Optional`] will have a value after the timeout is
1928 /// detected based on when the next sample is taken.
1929 pub fn timeout(
1930 self,
1931 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L::DropConsistency>> + Copy + 'a,
1932 nondet: NonDet,
1933 ) -> Optional<(), L::DropConsistency, Unbounded>
1934 where
1935 L: TopLevel<'a>,
1936 {
1937 let tick = self.location.tick();
1938
1939 let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
1940 q!(|| None),
1941 q!(
1942 |latest, _| {
1943 *latest = Some(Instant::now());
1944 },
1945 commutative = manual_proof!(/** TODO */)
1946 ),
1947 );
1948
1949 latest_received
1950 .snapshot(&tick, nondet)
1951 .filter_map(q!(move |latest_received| {
1952 if let Some(latest_received) = latest_received {
1953 if Instant::now().duration_since(latest_received) > duration {
1954 Some(())
1955 } else {
1956 None
1957 }
1958 } else {
1959 Some(())
1960 }
1961 }))
1962 .latest()
1963 }
1964
1965 /// Shifts this stream into an atomic context, which guarantees that any downstream logic
1966 /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
1967 ///
1968 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1969 /// processed before an acknowledgement is emitted.
1970 pub fn atomic(self) -> Stream<T, Atomic<L>, B, O, R> {
1971 let id = self.location.flow_state().borrow_mut().next_clock_id();
1972 let out_location = Atomic {
1973 tick: Tick {
1974 id,
1975 l: self.location.clone(),
1976 },
1977 };
1978 Stream::new(
1979 out_location.clone(),
1980 HydroNode::BeginAtomic {
1981 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1982 metadata: out_location
1983 .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
1984 },
1985 )
1986 }
1987
1988 /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1989 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1990 /// the order of the input. The output stream will execute in the [`Tick`] that was
1991 /// used to create the atomic section.
1992 ///
1993 /// # Non-Determinism
1994 /// The batch boundaries are non-deterministic and may change across executions.
1995 pub fn batch<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1996 self,
1997 tick: &Tick<L2>,
1998 _nondet: NonDet,
1999 ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
2000 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2001 Stream::new(
2002 tick.drop_consistency(),
2003 HydroNode::Batch {
2004 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2005 metadata: tick
2006 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2007 },
2008 )
2009 }
2010
2011 /// An operator which allows you to "name" a `HydroNode`.
2012 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
2013 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
2014 {
2015 let mut node = self.ir_node.borrow_mut();
2016 let metadata = node.metadata_mut();
2017 metadata.tag = Some(name.to_owned());
2018 }
2019 self
2020 }
2021
2022 /// Turns this [`Stream`] into a [`Optional`], under the invariant assumption that there is at
2023 /// most one element. If this invariant is broken, the program may exhibit undefined behavior,
2024 /// so uses must be carefully vetted.
2025 pub(crate) fn cast_at_most_one_element(self) -> Optional<T, L, B>
2026 where
2027 B: IsBounded,
2028 {
2029 Optional::new(
2030 self.location.clone(),
2031 HydroNode::Cast {
2032 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2033 metadata: self
2034 .location
2035 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
2036 },
2037 )
2038 }
2039
2040 pub(crate) fn use_ordering_type<O2: Ordering>(self) -> Stream<T, L, B, O2, R> {
2041 if O::ORDERING_KIND == O2::ORDERING_KIND {
2042 Stream::new(
2043 self.location.clone(),
2044 self.ir_node.replace(HydroNode::Placeholder),
2045 )
2046 } else {
2047 panic!(
2048 "Runtime ordering {:?} did not match requested cast {:?}.",
2049 O::ORDERING_KIND,
2050 O2::ORDERING_KIND
2051 )
2052 }
2053 }
2054
2055 /// Explicitly "casts" the stream to a type with a different ordering
2056 /// guarantee. Useful in unsafe code where the ordering cannot be proven
2057 /// by the type-system.
2058 ///
2059 /// # Non-Determinism
2060 /// This function is used as an escape hatch, and any mistakes in the
2061 /// provided ordering guarantee will propagate into the guarantees
2062 /// for the rest of the program.
2063 pub fn assume_ordering<O2: Ordering>(
2064 self,
2065 _nondet: NonDet,
2066 ) -> Stream<T, L::DropConsistency, B, O2, R> {
2067 if O::ORDERING_KIND == O2::ORDERING_KIND {
2068 self.use_ordering_type().weaken_consistency()
2069 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2070 // We can always weaken the ordering guarantee
2071 let target_location = self.location().drop_consistency();
2072 Stream::new(
2073 target_location.clone(),
2074 HydroNode::Cast {
2075 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2076 metadata: target_location
2077 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2078 },
2079 )
2080 } else {
2081 let target_location = self.location().drop_consistency();
2082 Stream::new(
2083 target_location.clone(),
2084 HydroNode::ObserveNonDet {
2085 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2086 trusted: false,
2087 metadata: target_location
2088 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2089 },
2090 )
2091 }
2092 }
2093
2094 // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
2095 // intermediate states will not be revealed
2096 fn assume_ordering_trusted_bounded<O2: Ordering>(
2097 self,
2098 nondet: NonDet,
2099 ) -> Stream<T, L, B, O2, R> {
2100 if B::BOUNDED {
2101 self.assume_ordering_trusted(nondet)
2102 } else {
2103 let self_location = self.location.clone();
2104 let inner: Stream<T, L::DropConsistency, B, O2, R> = self.assume_ordering(nondet);
2105 Stream::new(self_location, inner.ir_node.replace(HydroNode::Placeholder))
2106 }
2107 }
2108
2109 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
2110 // is not observable
2111 pub(crate) fn assume_ordering_trusted<O2: Ordering>(
2112 self,
2113 _nondet: NonDet,
2114 ) -> Stream<T, L, B, O2, R> {
2115 if O::ORDERING_KIND == O2::ORDERING_KIND {
2116 self.use_ordering_type()
2117 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2118 // We can always weaken the ordering guarantee
2119 Stream::new(
2120 self.location.clone(),
2121 HydroNode::Cast {
2122 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2123 metadata: self
2124 .location
2125 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2126 },
2127 )
2128 } else {
2129 Stream::new(
2130 self.location.clone(),
2131 HydroNode::ObserveNonDet {
2132 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2133 trusted: true,
2134 metadata: self
2135 .location
2136 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2137 },
2138 )
2139 }
2140 }
2141
2142 #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
2143 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
2144 /// which is always safe because that is the weakest possible guarantee.
2145 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
2146 self.weaken_ordering::<NoOrder>()
2147 }
2148
2149 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
2150 /// enforcing that `O2` is weaker than the input ordering guarantee.
2151 pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
2152 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
2153 self.assume_ordering_trusted::<O2>(nondet)
2154 }
2155
2156 /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
2157 /// implies that `O == TotalOrder`.
2158 pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
2159 where
2160 O: IsOrdered,
2161 {
2162 self.assume_ordering_trusted(nondet!(/** no-op */))
2163 }
2164
2165 /// Explicitly "casts" the stream to a type with a different retries
2166 /// guarantee. Useful in unsafe code where the lack of retries cannot
2167 /// be proven by the type-system.
2168 ///
2169 /// # Non-Determinism
2170 /// This function is used as an escape hatch, and any mistakes in the
2171 /// provided retries guarantee will propagate into the guarantees
2172 /// for the rest of the program.
2173 pub fn assume_retries<R2: Retries>(
2174 self,
2175 _nondet: NonDet,
2176 ) -> Stream<T, L::DropConsistency, B, O, R2> {
2177 if R::RETRIES_KIND == R2::RETRIES_KIND {
2178 Stream::new(
2179 self.location.drop_consistency(),
2180 self.ir_node.replace(HydroNode::Placeholder),
2181 )
2182 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2183 // We can always weaken the retries guarantee
2184 let target_location = self.location.drop_consistency();
2185 Stream::new(
2186 target_location.clone(),
2187 HydroNode::Cast {
2188 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2189 metadata: target_location
2190 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2191 },
2192 )
2193 } else {
2194 let target_location = self.location.drop_consistency();
2195 Stream::new(
2196 target_location.clone(),
2197 HydroNode::ObserveNonDet {
2198 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2199 trusted: false,
2200 metadata: target_location
2201 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2202 },
2203 )
2204 }
2205 }
2206
2207 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
2208 // is not observable
2209 fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
2210 if R::RETRIES_KIND == R2::RETRIES_KIND {
2211 Stream::new(
2212 self.location.clone(),
2213 self.ir_node.replace(HydroNode::Placeholder),
2214 )
2215 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2216 // We can always weaken the retries guarantee
2217 Stream::new(
2218 self.location.clone(),
2219 HydroNode::Cast {
2220 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2221 metadata: self
2222 .location
2223 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2224 },
2225 )
2226 } else {
2227 Stream::new(
2228 self.location.clone(),
2229 HydroNode::ObserveNonDet {
2230 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2231 trusted: true,
2232 metadata: self
2233 .location
2234 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2235 },
2236 )
2237 }
2238 }
2239
2240 #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
2241 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
2242 /// which is always safe because that is the weakest possible guarantee.
2243 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
2244 self.weaken_retries::<AtLeastOnce>()
2245 }
2246
2247 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
2248 /// enforcing that `R2` is weaker than the input retries guarantee.
2249 pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
2250 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
2251 self.assume_retries_trusted::<R2>(nondet)
2252 }
2253
2254 /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
2255 /// implies that `R == ExactlyOnce`.
2256 pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
2257 where
2258 R: IsExactlyOnce,
2259 {
2260 self.assume_retries_trusted(nondet!(/** no-op */))
2261 }
2262
2263 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
2264 /// implies that `B == Bounded`.
2265 pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
2266 where
2267 B: IsBounded,
2268 {
2269 self.weaken_boundedness()
2270 }
2271
2272 /// Weakens the boundedness guarantee to an arbitrary boundedness `B2`, given that `B: IsBounded`,
2273 /// which implies that `B == Bounded`.
2274 pub fn weaken_boundedness<B2: Boundedness>(self) -> Stream<T, L, B2, O, R> {
2275 if B::BOUNDED == B2::BOUNDED {
2276 Stream::new(
2277 self.location.clone(),
2278 self.ir_node.replace(HydroNode::Placeholder),
2279 )
2280 } else {
2281 // We can always weaken the boundedness
2282 Stream::new(
2283 self.location.clone(),
2284 HydroNode::Cast {
2285 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2286 metadata: self
2287 .location
2288 .new_node_metadata(Stream::<T, L, B2, O, R>::collection_kind()),
2289 },
2290 )
2291 }
2292 }
2293}
2294
2295impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
2296where
2297 L: Location<'a>,
2298{
2299 /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
2300 ///
2301 /// # Example
2302 /// ```rust
2303 /// # #[cfg(feature = "deploy")] {
2304 /// # use hydro_lang::prelude::*;
2305 /// # use futures::StreamExt;
2306 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2307 /// process.source_iter(q!(&[1, 2, 3])).cloned()
2308 /// # }, |mut stream| async move {
2309 /// // 1, 2, 3
2310 /// # for w in vec![1, 2, 3] {
2311 /// # assert_eq!(stream.next().await.unwrap(), w);
2312 /// # }
2313 /// # }));
2314 /// # }
2315 /// ```
2316 pub fn cloned(self) -> Stream<T, L, B, O, R>
2317 where
2318 T: Clone,
2319 {
2320 self.map(q!(|d| d.clone()))
2321 }
2322}
2323
2324impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2325where
2326 L: Location<'a>,
2327{
2328 /// Computes the number of elements in the stream as a [`Singleton`].
2329 ///
2330 /// # Example
2331 /// ```rust
2332 /// # #[cfg(feature = "deploy")] {
2333 /// # use hydro_lang::prelude::*;
2334 /// # use futures::StreamExt;
2335 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2336 /// let tick = process.tick();
2337 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2338 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2339 /// batch.count().all_ticks()
2340 /// # }, |mut stream| async move {
2341 /// // 4
2342 /// # assert_eq!(stream.next().await.unwrap(), 4);
2343 /// # }));
2344 /// # }
2345 /// ```
2346 pub fn count(self) -> Singleton<usize, L, B::StreamToMonotone> {
2347 self.assume_ordering_trusted::<TotalOrder>(nondet!(
2348 /// Order does not affect eventual count, and also does not affect intermediate states.
2349 ))
2350 .fold(
2351 q!(|| 0usize),
2352 q!(
2353 |count, _| *count += 1,
2354 monotone = manual_proof!(/** += 1 is monotone */)
2355 ),
2356 )
2357 }
2358}
2359
2360impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2361 /// Produces a new stream that merges the elements of the two input streams.
2362 /// The result has [`NoOrder`] because the order of merging is not guaranteed.
2363 ///
2364 /// Currently, both input streams must be [`Unbounded`]. When the streams are
2365 /// [`Bounded`], you can use [`Stream::chain`] instead.
2366 ///
2367 /// # Example
2368 /// ```rust
2369 /// # #[cfg(feature = "deploy")] {
2370 /// # use hydro_lang::prelude::*;
2371 /// # use futures::StreamExt;
2372 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2373 /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
2374 /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
2375 /// numbers.clone().map(q!(|x| x + 1)).merge_unordered(numbers)
2376 /// # }, |mut stream| async move {
2377 /// // 2, 3, 4, 5, and 1, 2, 3, 4 merged in unknown order
2378 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2379 /// # assert_eq!(stream.next().await.unwrap(), w);
2380 /// # }
2381 /// # }));
2382 /// # }
2383 /// ```
2384 pub fn merge_unordered<O2: Ordering, R2: Retries>(
2385 self,
2386 other: Stream<T, L, Unbounded, O2, R2>,
2387 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2388 where
2389 R: MinRetries<R2>,
2390 {
2391 Stream::new(
2392 self.location.clone(),
2393 HydroNode::Chain {
2394 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2395 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2396 metadata: self.location.new_node_metadata(Stream::<
2397 T,
2398 L,
2399 Unbounded,
2400 NoOrder,
2401 <R as MinRetries<R2>>::Min,
2402 >::collection_kind()),
2403 },
2404 )
2405 }
2406
2407 /// Deprecated: use [`Stream::merge_unordered`] instead.
2408 #[deprecated(note = "use `merge_unordered` instead")]
2409 pub fn interleave<O2: Ordering, R2: Retries>(
2410 self,
2411 other: Stream<T, L, Unbounded, O2, R2>,
2412 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2413 where
2414 R: MinRetries<R2>,
2415 {
2416 self.merge_unordered(other)
2417 }
2418}
2419
2420impl<'a, T, L: Location<'a>, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R> {
2421 /// Produces a new stream that combines the elements of the two input streams,
2422 /// preserving the relative order of elements within each input.
2423 ///
2424 /// # Non-Determinism
2425 /// The order in which elements *across* the two streams will be interleaved is
2426 /// non-deterministic, so the order of elements will vary across runs. If the output
2427 /// order is irrelevant, use [`Stream::merge_unordered`] instead, which is deterministic
2428 /// but emits an unordered stream. For deterministic first-then-second ordering on
2429 /// bounded streams, use [`Stream::chain`].
2430 ///
2431 /// # Example
2432 /// ```rust
2433 /// # #[cfg(feature = "deploy")] {
2434 /// # use hydro_lang::prelude::*;
2435 /// # use futures::StreamExt;
2436 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2437 /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
2438 /// # process.source_iter(q!(vec![1, 3])).into();
2439 /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
2440 /// # }, |mut stream| async move {
2441 /// // 1, 3 and 2, 4 in some order, preserving the original local order
2442 /// # for w in vec![1, 3, 2, 4] {
2443 /// # assert_eq!(stream.next().await.unwrap(), w);
2444 /// # }
2445 /// # }));
2446 /// # }
2447 /// ```
2448 pub fn merge_ordered<R2: Retries>(
2449 self,
2450 other: Stream<T, L, B, TotalOrder, R2>,
2451 _nondet: NonDet,
2452 ) -> Stream<T, L::DropConsistency, B, TotalOrder, <R as MinRetries<R2>>::Min>
2453 where
2454 R: MinRetries<R2>,
2455 {
2456 let target_location = self.location().drop_consistency();
2457 Stream::new(
2458 target_location.clone(),
2459 HydroNode::MergeOrdered {
2460 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2461 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2462 metadata: target_location.new_node_metadata(Stream::<
2463 T,
2464 L::DropConsistency,
2465 B,
2466 TotalOrder,
2467 <R as MinRetries<R2>>::Min,
2468 >::collection_kind()),
2469 },
2470 )
2471 }
2472}
2473
2474impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2475where
2476 L: Location<'a>,
2477{
2478 /// Produces a new stream that emits the input elements in sorted order.
2479 ///
2480 /// The input stream can have any ordering guarantee, but the output stream
2481 /// will have a [`TotalOrder`] guarantee. This operator will block until all
2482 /// elements in the input stream are available, so it requires the input stream
2483 /// to be [`Bounded`].
2484 ///
2485 /// # Example
2486 /// ```rust
2487 /// # #[cfg(feature = "deploy")] {
2488 /// # use hydro_lang::prelude::*;
2489 /// # use futures::StreamExt;
2490 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2491 /// let tick = process.tick();
2492 /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
2493 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2494 /// batch.sort().all_ticks()
2495 /// # }, |mut stream| async move {
2496 /// // 1, 2, 3, 4
2497 /// # for w in (1..5) {
2498 /// # assert_eq!(stream.next().await.unwrap(), w);
2499 /// # }
2500 /// # }));
2501 /// # }
2502 /// ```
2503 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2504 where
2505 B: IsBounded,
2506 T: Ord,
2507 {
2508 let this = self.make_bounded();
2509 Stream::new(
2510 this.location.clone(),
2511 HydroNode::Sort {
2512 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2513 metadata: this
2514 .location
2515 .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2516 },
2517 )
2518 }
2519
2520 /// Produces a new stream that first emits the elements of the `self` stream,
2521 /// and then emits the elements of the `other` stream. The output stream has
2522 /// a [`TotalOrder`] guarantee if and only if both input streams have a
2523 /// [`TotalOrder`] guarantee.
2524 ///
2525 /// Currently, both input streams must be [`Bounded`]. This operator will block
2526 /// on the first stream until all its elements are available. In a future version,
2527 /// we will relax the requirement on the `other` stream.
2528 ///
2529 /// # Example
2530 /// ```rust
2531 /// # #[cfg(feature = "deploy")] {
2532 /// # use hydro_lang::prelude::*;
2533 /// # use futures::StreamExt;
2534 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2535 /// let tick = process.tick();
2536 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2537 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2538 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2539 /// # }, |mut stream| async move {
2540 /// // 2, 3, 4, 5, 1, 2, 3, 4
2541 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2542 /// # assert_eq!(stream.next().await.unwrap(), w);
2543 /// # }
2544 /// # }));
2545 /// # }
2546 /// ```
2547 pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2548 self,
2549 other: Stream<T, L, B2, O2, R2>,
2550 ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2551 where
2552 B: IsBounded,
2553 O: MinOrder<O2>,
2554 R: MinRetries<R2>,
2555 {
2556 check_matching_location(&self.location, &other.location);
2557
2558 Stream::new(
2559 self.location.clone(),
2560 HydroNode::Chain {
2561 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2562 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2563 metadata: self.location.new_node_metadata(Stream::<
2564 T,
2565 L,
2566 B2,
2567 <O as MinOrder<O2>>::Min,
2568 <R as MinRetries<R2>>::Min,
2569 >::collection_kind()),
2570 },
2571 )
2572 }
2573
2574 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
2575 /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
2576 /// because this is compiled into a nested loop.
2577 pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
2578 self,
2579 other: Stream<T2, L, Bounded, O2, R>,
2580 ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
2581 where
2582 B: IsBounded,
2583 T: Clone,
2584 T2: Clone,
2585 {
2586 let this = self.make_bounded();
2587 check_matching_location(&this.location, &other.location);
2588
2589 Stream::new(
2590 this.location.clone(),
2591 HydroNode::CrossProduct {
2592 left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2593 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2594 metadata: this.location.new_node_metadata(Stream::<
2595 (T, T2),
2596 L,
2597 Bounded,
2598 <O2 as MinOrder<O>>::Min,
2599 R,
2600 >::collection_kind()),
2601 },
2602 )
2603 }
2604
2605 /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2606 /// `self` used as the values for *each* key.
2607 ///
2608 /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2609 /// values. For example, it can be used to send the same set of elements to several cluster
2610 /// members, if the membership information is available as a [`KeyedSingleton`].
2611 ///
2612 /// # Example
2613 /// ```rust
2614 /// # #[cfg(feature = "deploy")] {
2615 /// # use hydro_lang::prelude::*;
2616 /// # use futures::StreamExt;
2617 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2618 /// # let tick = process.tick();
2619 /// let keyed_singleton = // { 1: (), 2: () }
2620 /// # process
2621 /// # .source_iter(q!(vec![(1, ()), (2, ())]))
2622 /// # .into_keyed()
2623 /// # .batch(&tick, nondet!(/** test */))
2624 /// # .first();
2625 /// let stream = // [ "a", "b" ]
2626 /// # process
2627 /// # .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2628 /// # .batch(&tick, nondet!(/** test */));
2629 /// stream.repeat_with_keys(keyed_singleton)
2630 /// # .entries().all_ticks()
2631 /// # }, |mut stream| async move {
2632 /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2633 /// # let mut results = Vec::new();
2634 /// # for _ in 0..4 {
2635 /// # results.push(stream.next().await.unwrap());
2636 /// # }
2637 /// # results.sort();
2638 /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2639 /// # }));
2640 /// # }
2641 /// ```
2642 pub fn repeat_with_keys<K, V2>(
2643 self,
2644 keys: KeyedSingleton<K, V2, L, Bounded>,
2645 ) -> KeyedStream<K, T, L, Bounded, O, R>
2646 where
2647 B: IsBounded,
2648 K: Clone,
2649 T: Clone,
2650 {
2651 keys.keys()
2652 .weaken_retries()
2653 .assume_ordering_trusted::<TotalOrder>(
2654 nondet!(/** keyed stream does not depend on ordering of keys */),
2655 )
2656 .cross_product_nested_loop(self.make_bounded())
2657 .into_keyed()
2658 }
2659
2660 /// Consumes a stream of `Future<T>`, resolving each future while blocking subgraph
2661 /// execution until all results are available. The output order is based on when futures
2662 /// complete, and may be different than the input order.
2663 ///
2664 /// Unlike [`Stream::resolve_futures`], which allows the subgraph to continue executing
2665 /// while futures are pending, this variant blocks until the futures resolve.
2666 ///
2667 /// # Example
2668 /// ```rust
2669 /// # #[cfg(feature = "deploy")] {
2670 /// # use std::collections::HashSet;
2671 /// # use futures::StreamExt;
2672 /// # use hydro_lang::prelude::*;
2673 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2674 /// process
2675 /// .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2676 /// .map(q!(|x| async move {
2677 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2678 /// x
2679 /// }))
2680 /// .resolve_futures_blocking()
2681 /// # },
2682 /// # |mut stream| async move {
2683 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2684 /// # let mut output = HashSet::new();
2685 /// # for _ in 1..10 {
2686 /// # output.insert(stream.next().await.unwrap());
2687 /// # }
2688 /// # assert_eq!(
2689 /// # output,
2690 /// # HashSet::<i32>::from_iter(1..10)
2691 /// # );
2692 /// # },
2693 /// # ));
2694 /// # }
2695 /// ```
2696 pub fn resolve_futures_blocking(self) -> Stream<T::Output, L, B, NoOrder, R>
2697 where
2698 T: Future,
2699 {
2700 Stream::new(
2701 self.location.clone(),
2702 HydroNode::ResolveFuturesBlocking {
2703 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2704 metadata: self
2705 .location
2706 .new_node_metadata(Stream::<T::Output, L, B, NoOrder, R>::collection_kind()),
2707 },
2708 )
2709 }
2710
2711 /// Returns a [`Singleton`] containing `true` if the stream has no elements, or `false` otherwise.
2712 ///
2713 /// # Example
2714 /// ```rust
2715 /// # #[cfg(feature = "deploy")] {
2716 /// # use hydro_lang::prelude::*;
2717 /// # use futures::StreamExt;
2718 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2719 /// let tick = process.tick();
2720 /// let empty: Stream<i32, _, Bounded> = process
2721 /// .source_iter(q!(Vec::<i32>::new()))
2722 /// .batch(&tick, nondet!(/** test */));
2723 /// empty.is_empty().all_ticks()
2724 /// # }, |mut stream| async move {
2725 /// // true
2726 /// # assert_eq!(stream.next().await.unwrap(), true);
2727 /// # }));
2728 /// # }
2729 /// ```
2730 #[expect(clippy::wrong_self_convention, reason = "stream function naming")]
2731 pub fn is_empty(self) -> Singleton<bool, L, Bounded>
2732 where
2733 B: IsBounded,
2734 {
2735 self.make_bounded()
2736 .assume_ordering_trusted::<TotalOrder>(
2737 nondet!(/** is_empty intermediates unaffected by order */),
2738 )
2739 .first()
2740 .is_none()
2741 }
2742}
2743
2744impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2745where
2746 L: Location<'a>,
2747{
2748 #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2749 /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2750 /// by equi-joining the two streams on the key attribute `K`.
2751 ///
2752 /// When the right-hand side is [`Bounded`], the join accumulates the right side first
2753 /// and streams the left side through, preserving the left side's ordering. When both
2754 /// sides are [`Unbounded`], a symmetric hash join is used and ordering is [`NoOrder`].
2755 ///
2756 /// # Example
2757 /// ```rust
2758 /// # #[cfg(feature = "deploy")] {
2759 /// # use hydro_lang::prelude::*;
2760 /// # use std::collections::HashSet;
2761 /// # use futures::StreamExt;
2762 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2763 /// let tick = process.tick();
2764 /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2765 /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2766 /// stream1.join(stream2)
2767 /// # }, |mut stream| async move {
2768 /// // (1, ('a', 'x')), (2, ('b', 'y'))
2769 /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2770 /// # stream.map(|i| assert!(expected.contains(&i)));
2771 /// # }));
2772 /// # }
2773 pub fn join<V2, B2: Boundedness, O2: Ordering, R2: Retries>(
2774 self,
2775 n: Stream<(K, V2), L, B2, O2, R2>,
2776 ) -> Stream<(K, (V1, V2)), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
2777 where
2778 K: Eq + Hash + Clone,
2779 R: MinRetries<R2>,
2780 V1: Clone,
2781 V2: Clone,
2782 {
2783 check_matching_location(&self.location, &n.location);
2784
2785 let ir_node = if B2::BOUNDED {
2786 HydroNode::JoinHalf {
2787 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2788 right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2789 metadata: self.location.new_node_metadata(Stream::<
2790 (K, (V1, V2)),
2791 L,
2792 B,
2793 B2::PreserveOrderIfBounded<O>,
2794 <R as MinRetries<R2>>::Min,
2795 >::collection_kind()),
2796 }
2797 } else {
2798 HydroNode::Join {
2799 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2800 right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2801 metadata: self.location.new_node_metadata(Stream::<
2802 (K, (V1, V2)),
2803 L,
2804 B,
2805 B2::PreserveOrderIfBounded<O>,
2806 <R as MinRetries<R2>>::Min,
2807 >::collection_kind()),
2808 }
2809 };
2810
2811 Stream::new(self.location.clone(), ir_node)
2812 }
2813
2814 /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2815 /// computes the anti-join of the items in the input -- i.e. returns
2816 /// unique items in the first input that do not have a matching key
2817 /// in the second input.
2818 ///
2819 /// # Example
2820 /// ```rust
2821 /// # #[cfg(feature = "deploy")] {
2822 /// # use hydro_lang::prelude::*;
2823 /// # use futures::StreamExt;
2824 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2825 /// let tick = process.tick();
2826 /// let stream = process
2827 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2828 /// .batch(&tick, nondet!(/** test */));
2829 /// let batch = process
2830 /// .source_iter(q!(vec![1, 2]))
2831 /// .batch(&tick, nondet!(/** test */));
2832 /// stream.anti_join(batch).all_ticks()
2833 /// # }, |mut stream| async move {
2834 /// # for w in vec![(3, 'c'), (4, 'd')] {
2835 /// # assert_eq!(stream.next().await.unwrap(), w);
2836 /// # }
2837 /// # }));
2838 /// # }
2839 pub fn anti_join<O2: Ordering, R2: Retries>(
2840 self,
2841 n: Stream<K, L, Bounded, O2, R2>,
2842 ) -> Stream<(K, V1), L, B, O, R>
2843 where
2844 K: Eq + Hash,
2845 {
2846 check_matching_location(&self.location, &n.location);
2847
2848 Stream::new(
2849 self.location.clone(),
2850 HydroNode::AntiJoin {
2851 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2852 neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2853 metadata: self
2854 .location
2855 .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2856 },
2857 )
2858 }
2859}
2860
2861impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2862 Stream<(K, V), L, B, O, R>
2863{
2864 /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2865 /// is used as the key and the second element is added to the entries associated with that key.
2866 ///
2867 /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2868 /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2869 /// performing grouped aggregations, but also for more precise ordering guarantees such as
2870 /// total ordering _within_ each group but no ordering _across_ groups.
2871 ///
2872 /// # Example
2873 /// ```rust
2874 /// # #[cfg(feature = "deploy")] {
2875 /// # use hydro_lang::prelude::*;
2876 /// # use futures::StreamExt;
2877 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2878 /// process
2879 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2880 /// .into_keyed()
2881 /// # .entries()
2882 /// # }, |mut stream| async move {
2883 /// // { 1: [2, 3], 2: [4] }
2884 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2885 /// # assert_eq!(stream.next().await.unwrap(), w);
2886 /// # }
2887 /// # }));
2888 /// # }
2889 /// ```
2890 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2891 KeyedStream::new(
2892 self.location.clone(),
2893 HydroNode::Cast {
2894 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2895 metadata: self
2896 .location
2897 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2898 },
2899 )
2900 }
2901}
2902
2903impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2904where
2905 K: Eq + Hash,
2906 L: Location<'a>,
2907{
2908 /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2909 /// # Example
2910 /// ```rust
2911 /// # #[cfg(feature = "deploy")] {
2912 /// # use hydro_lang::prelude::*;
2913 /// # use futures::StreamExt;
2914 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2915 /// let tick = process.tick();
2916 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2917 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2918 /// batch.keys().all_ticks()
2919 /// # }, |mut stream| async move {
2920 /// // 1, 2
2921 /// # assert_eq!(stream.next().await.unwrap(), 1);
2922 /// # assert_eq!(stream.next().await.unwrap(), 2);
2923 /// # }));
2924 /// # }
2925 /// ```
2926 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2927 self.into_keyed()
2928 .fold(
2929 q!(|| ()),
2930 q!(
2931 |_, _| {},
2932 commutative = manual_proof!(/** values are ignored */),
2933 idempotent = manual_proof!(/** values are ignored */)
2934 ),
2935 )
2936 .keys()
2937 }
2938}
2939
2940impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2941where
2942 L: Location<'a>,
2943{
2944 /// Returns a stream corresponding to the latest batch of elements being atomically
2945 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2946 /// the order of the input.
2947 ///
2948 /// # Non-Determinism
2949 /// The batch boundaries are non-deterministic and may change across executions.
2950 pub fn batch_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
2951 self,
2952 tick: &Tick<L2>,
2953 _nondet: NonDet,
2954 ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
2955 Stream::new(
2956 tick.drop_consistency(),
2957 HydroNode::Batch {
2958 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2959 metadata: tick
2960 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2961 },
2962 )
2963 }
2964
2965 /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2966 /// See [`Stream::atomic`] for more details.
2967 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2968 Stream::new(
2969 self.location.tick.l.clone(),
2970 HydroNode::EndAtomic {
2971 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2972 metadata: self
2973 .location
2974 .tick
2975 .l
2976 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2977 },
2978 )
2979 }
2980}
2981
2982impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2983where
2984 L: TopLevel<'a>,
2985 F: Future<Output = T>,
2986{
2987 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2988 /// Future outputs are produced as available, regardless of input arrival order.
2989 ///
2990 /// # Example
2991 /// ```rust
2992 /// # #[cfg(feature = "deploy")] {
2993 /// # use std::collections::HashSet;
2994 /// # use futures::StreamExt;
2995 /// # use hydro_lang::prelude::*;
2996 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2997 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2998 /// .map(q!(|x| async move {
2999 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
3000 /// x
3001 /// }))
3002 /// .resolve_futures()
3003 /// # },
3004 /// # |mut stream| async move {
3005 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
3006 /// # let mut output = HashSet::new();
3007 /// # for _ in 1..10 {
3008 /// # output.insert(stream.next().await.unwrap());
3009 /// # }
3010 /// # assert_eq!(
3011 /// # output,
3012 /// # HashSet::<i32>::from_iter(1..10)
3013 /// # );
3014 /// # },
3015 /// # ));
3016 /// # }
3017 pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
3018 Stream::new(
3019 self.location.clone(),
3020 HydroNode::ResolveFutures {
3021 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3022 metadata: self
3023 .location
3024 .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
3025 },
3026 )
3027 }
3028
3029 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
3030 /// Future outputs are produced in the same order as the input stream.
3031 ///
3032 /// # Example
3033 /// ```rust
3034 /// # #[cfg(feature = "deploy")] {
3035 /// # use std::collections::HashSet;
3036 /// # use futures::StreamExt;
3037 /// # use hydro_lang::prelude::*;
3038 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3039 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
3040 /// .map(q!(|x| async move {
3041 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
3042 /// x
3043 /// }))
3044 /// .resolve_futures_ordered()
3045 /// # },
3046 /// # |mut stream| async move {
3047 /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
3048 /// # let mut output = Vec::new();
3049 /// # for _ in 1..10 {
3050 /// # output.push(stream.next().await.unwrap());
3051 /// # }
3052 /// # assert_eq!(
3053 /// # output,
3054 /// # vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
3055 /// # );
3056 /// # },
3057 /// # ));
3058 /// # }
3059 pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
3060 Stream::new(
3061 self.location.clone(),
3062 HydroNode::ResolveFuturesOrdered {
3063 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3064 metadata: self
3065 .location
3066 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3067 },
3068 )
3069 }
3070}
3071
3072impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
3073where
3074 L: Location<'a>,
3075{
3076 /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
3077 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
3078 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
3079 Stream::new(
3080 self.location.outer().clone(),
3081 HydroNode::YieldConcat {
3082 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3083 metadata: self
3084 .location
3085 .outer()
3086 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3087 },
3088 )
3089 }
3090
3091 /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
3092 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
3093 ///
3094 /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
3095 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
3096 /// stream's [`Tick`] context.
3097 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
3098 let out_location = Atomic {
3099 tick: self.location.clone(),
3100 };
3101
3102 Stream::new(
3103 out_location.clone(),
3104 HydroNode::YieldConcat {
3105 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3106 metadata: out_location
3107 .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
3108 },
3109 )
3110 }
3111
3112 /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
3113 /// such as `fold` retrain their memory across ticks rather than resetting across batches of
3114 /// input.
3115 ///
3116 /// This API is particularly useful for stateful computation on batches of data, such as
3117 /// maintaining an accumulated state that is up to date with the current batch.
3118 ///
3119 /// # Example
3120 /// ```rust
3121 /// # #[cfg(feature = "deploy")] {
3122 /// # use hydro_lang::prelude::*;
3123 /// # use futures::StreamExt;
3124 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3125 /// let tick = process.tick();
3126 /// # // ticks are lazy by default, forces the second tick to run
3127 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3128 /// # let batch_first_tick = process
3129 /// # .source_iter(q!(vec![1, 2, 3, 4]))
3130 /// # .batch(&tick, nondet!(/** test */));
3131 /// # let batch_second_tick = process
3132 /// # .source_iter(q!(vec![5, 6, 7]))
3133 /// # .batch(&tick, nondet!(/** test */))
3134 /// # .defer_tick(); // appears on the second tick
3135 /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
3136 /// # batch_first_tick.chain(batch_second_tick).all_ticks();
3137 ///
3138 /// input.batch(&tick, nondet!(/** test */))
3139 /// .across_ticks(|s| s.count()).all_ticks()
3140 /// # }, |mut stream| async move {
3141 /// // [4, 7]
3142 /// assert_eq!(stream.next().await.unwrap(), 4);
3143 /// assert_eq!(stream.next().await.unwrap(), 7);
3144 /// # }));
3145 /// # }
3146 /// ```
3147 pub fn across_ticks<Out: BatchAtomic<'a>>(
3148 self,
3149 thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
3150 ) -> Out::Batched {
3151 thunk(self.all_ticks_atomic()).batched_atomic()
3152 }
3153
3154 /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
3155 /// always has the elements of `self` at tick `T - 1`.
3156 ///
3157 /// At tick `0`, the output stream is empty, since there is no previous tick.
3158 ///
3159 /// This operator enables stateful iterative processing with ticks, by sending data from one
3160 /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
3161 ///
3162 /// # Example
3163 /// ```rust
3164 /// # #[cfg(feature = "deploy")] {
3165 /// # use hydro_lang::prelude::*;
3166 /// # use futures::StreamExt;
3167 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3168 /// let tick = process.tick();
3169 /// // ticks are lazy by default, forces the second tick to run
3170 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3171 ///
3172 /// let batch_first_tick = process
3173 /// .source_iter(q!(vec![1, 2, 3, 4]))
3174 /// .batch(&tick, nondet!(/** test */));
3175 /// let batch_second_tick = process
3176 /// .source_iter(q!(vec![0, 3, 4, 5, 6]))
3177 /// .batch(&tick, nondet!(/** test */))
3178 /// .defer_tick(); // appears on the second tick
3179 /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
3180 ///
3181 /// changes_across_ticks.clone().filter_not_in(
3182 /// changes_across_ticks.defer_tick() // the elements from the previous tick
3183 /// ).all_ticks()
3184 /// # }, |mut stream| async move {
3185 /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
3186 /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
3187 /// # assert_eq!(stream.next().await.unwrap(), w);
3188 /// # }
3189 /// # }));
3190 /// # }
3191 /// ```
3192 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
3193 Stream::new(
3194 self.location.clone(),
3195 HydroNode::DeferTick {
3196 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3197 metadata: self
3198 .location
3199 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3200 },
3201 )
3202 }
3203}
3204
3205#[cfg(test)]
3206mod tests {
3207 #[cfg(feature = "deploy")]
3208 use futures::{SinkExt, StreamExt};
3209 #[cfg(feature = "deploy")]
3210 use hydro_deploy::Deployment;
3211 #[cfg(feature = "deploy")]
3212 use serde::{Deserialize, Serialize};
3213 #[cfg(any(feature = "deploy", feature = "sim"))]
3214 use stageleft::q;
3215
3216 #[cfg(any(feature = "deploy", feature = "sim"))]
3217 use crate::compile::builder::FlowBuilder;
3218 #[cfg(feature = "deploy")]
3219 use crate::live_collections::sliced::sliced;
3220 #[cfg(feature = "deploy")]
3221 use crate::live_collections::stream::ExactlyOnce;
3222 #[cfg(feature = "sim")]
3223 use crate::live_collections::stream::NoOrder;
3224 #[cfg(any(feature = "deploy", feature = "sim"))]
3225 use crate::live_collections::stream::TotalOrder;
3226 #[cfg(any(feature = "deploy", feature = "sim"))]
3227 use crate::location::Location;
3228 #[cfg(feature = "sim")]
3229 use crate::networking::TCP;
3230 #[cfg(any(feature = "deploy", feature = "sim"))]
3231 use crate::nondet::nondet;
3232
3233 mod backtrace_chained_ops;
3234
3235 #[cfg(feature = "deploy")]
3236 struct P1 {}
3237 #[cfg(feature = "deploy")]
3238 struct P2 {}
3239
3240 #[cfg(feature = "deploy")]
3241 #[derive(Serialize, Deserialize, Debug)]
3242 struct SendOverNetwork {
3243 n: u32,
3244 }
3245
3246 #[cfg(feature = "deploy")]
3247 #[tokio::test]
3248 async fn first_ten_distributed() {
3249 use crate::networking::TCP;
3250
3251 let mut deployment = Deployment::new();
3252
3253 let mut flow = FlowBuilder::new();
3254 let first_node = flow.process::<P1>();
3255 let second_node = flow.process::<P2>();
3256 let external = flow.external::<P2>();
3257
3258 let numbers = first_node.source_iter(q!(0..10));
3259 let out_port = numbers
3260 .map(q!(|n| SendOverNetwork { n }))
3261 .send(&second_node, TCP.fail_stop().bincode())
3262 .send_bincode_external(&external);
3263
3264 let nodes = flow
3265 .with_process(&first_node, deployment.Localhost())
3266 .with_process(&second_node, deployment.Localhost())
3267 .with_external(&external, deployment.Localhost())
3268 .deploy(&mut deployment);
3269
3270 deployment.deploy().await.unwrap();
3271
3272 let mut external_out = nodes.connect(out_port).await;
3273
3274 deployment.start().await.unwrap();
3275
3276 for i in 0..10 {
3277 assert_eq!(external_out.next().await.unwrap().n, i);
3278 }
3279 }
3280
3281 #[cfg(feature = "deploy")]
3282 #[tokio::test]
3283 async fn first_cardinality() {
3284 let mut deployment = Deployment::new();
3285
3286 let mut flow = FlowBuilder::new();
3287 let node = flow.process::<()>();
3288 let external = flow.external::<()>();
3289
3290 let node_tick = node.tick();
3291 let count = node_tick
3292 .singleton(q!([1, 2, 3]))
3293 .into_stream()
3294 .flatten_ordered()
3295 .first()
3296 .into_stream()
3297 .count()
3298 .all_ticks()
3299 .send_bincode_external(&external);
3300
3301 let nodes = flow
3302 .with_process(&node, deployment.Localhost())
3303 .with_external(&external, deployment.Localhost())
3304 .deploy(&mut deployment);
3305
3306 deployment.deploy().await.unwrap();
3307
3308 let mut external_out = nodes.connect(count).await;
3309
3310 deployment.start().await.unwrap();
3311
3312 assert_eq!(external_out.next().await.unwrap(), 1);
3313 }
3314
3315 #[cfg(feature = "deploy")]
3316 #[tokio::test]
3317 async fn unbounded_reduce_remembers_state() {
3318 let mut deployment = Deployment::new();
3319
3320 let mut flow = FlowBuilder::new();
3321 let node = flow.process::<()>();
3322 let external = flow.external::<()>();
3323
3324 let (input_port, input) = node.source_external_bincode(&external);
3325 let out = input
3326 .reduce(q!(|acc, v| *acc += v))
3327 .sample_eager(nondet!(/** test */))
3328 .send_bincode_external(&external);
3329
3330 let nodes = flow
3331 .with_process(&node, deployment.Localhost())
3332 .with_external(&external, deployment.Localhost())
3333 .deploy(&mut deployment);
3334
3335 deployment.deploy().await.unwrap();
3336
3337 let mut external_in = nodes.connect(input_port).await;
3338 let mut external_out = nodes.connect(out).await;
3339
3340 deployment.start().await.unwrap();
3341
3342 external_in.send(1).await.unwrap();
3343 assert_eq!(external_out.next().await.unwrap(), 1);
3344
3345 external_in.send(2).await.unwrap();
3346 assert_eq!(external_out.next().await.unwrap(), 3);
3347 }
3348
3349 #[cfg(feature = "deploy")]
3350 #[tokio::test]
3351 async fn top_level_bounded_cross_singleton() {
3352 let mut deployment = Deployment::new();
3353
3354 let mut flow = FlowBuilder::new();
3355 let node = flow.process::<()>();
3356 let external = flow.external::<()>();
3357
3358 let (input_port, input) =
3359 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3360
3361 let out = input
3362 .cross_singleton(
3363 node.source_iter(q!(vec![1, 2, 3]))
3364 .fold(q!(|| 0), q!(|acc, v| *acc += v)),
3365 )
3366 .send_bincode_external(&external);
3367
3368 let nodes = flow
3369 .with_process(&node, deployment.Localhost())
3370 .with_external(&external, deployment.Localhost())
3371 .deploy(&mut deployment);
3372
3373 deployment.deploy().await.unwrap();
3374
3375 let mut external_in = nodes.connect(input_port).await;
3376 let mut external_out = nodes.connect(out).await;
3377
3378 deployment.start().await.unwrap();
3379
3380 external_in.send(1).await.unwrap();
3381 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3382
3383 external_in.send(2).await.unwrap();
3384 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3385 }
3386
3387 #[cfg(feature = "deploy")]
3388 #[tokio::test]
3389 async fn top_level_bounded_reduce_cardinality() {
3390 let mut deployment = Deployment::new();
3391
3392 let mut flow = FlowBuilder::new();
3393 let node = flow.process::<()>();
3394 let external = flow.external::<()>();
3395
3396 let (input_port, input) =
3397 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3398
3399 let out = sliced! {
3400 let input = use(input, nondet!(/** test */));
3401 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
3402 input.cross_singleton(v.into_stream().count())
3403 }
3404 .send_bincode_external(&external);
3405
3406 let nodes = flow
3407 .with_process(&node, deployment.Localhost())
3408 .with_external(&external, deployment.Localhost())
3409 .deploy(&mut deployment);
3410
3411 deployment.deploy().await.unwrap();
3412
3413 let mut external_in = nodes.connect(input_port).await;
3414 let mut external_out = nodes.connect(out).await;
3415
3416 deployment.start().await.unwrap();
3417
3418 external_in.send(1).await.unwrap();
3419 assert_eq!(external_out.next().await.unwrap(), (1, 1));
3420
3421 external_in.send(2).await.unwrap();
3422 assert_eq!(external_out.next().await.unwrap(), (2, 1));
3423 }
3424
3425 #[cfg(feature = "deploy")]
3426 #[tokio::test]
3427 async fn top_level_bounded_into_singleton_cardinality() {
3428 let mut deployment = Deployment::new();
3429
3430 let mut flow = FlowBuilder::new();
3431 let node = flow.process::<()>();
3432 let external = flow.external::<()>();
3433
3434 let (input_port, input) =
3435 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3436
3437 let out = sliced! {
3438 let input = use(input, nondet!(/** test */));
3439 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
3440 input.cross_singleton(v.into_stream().count())
3441 }
3442 .send_bincode_external(&external);
3443
3444 let nodes = flow
3445 .with_process(&node, deployment.Localhost())
3446 .with_external(&external, deployment.Localhost())
3447 .deploy(&mut deployment);
3448
3449 deployment.deploy().await.unwrap();
3450
3451 let mut external_in = nodes.connect(input_port).await;
3452 let mut external_out = nodes.connect(out).await;
3453
3454 deployment.start().await.unwrap();
3455
3456 external_in.send(1).await.unwrap();
3457 assert_eq!(external_out.next().await.unwrap(), (1, 1));
3458
3459 external_in.send(2).await.unwrap();
3460 assert_eq!(external_out.next().await.unwrap(), (2, 1));
3461 }
3462
3463 #[cfg(feature = "deploy")]
3464 #[tokio::test]
3465 async fn atomic_fold_replays_each_tick() {
3466 let mut deployment = Deployment::new();
3467
3468 let mut flow = FlowBuilder::new();
3469 let node = flow.process::<()>();
3470 let external = flow.external::<()>();
3471
3472 let (input_port, input) =
3473 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3474 let tick = node.tick();
3475
3476 let out = input
3477 .batch(&tick, nondet!(/** test */))
3478 .cross_singleton(
3479 node.source_iter(q!(vec![1, 2, 3]))
3480 .atomic()
3481 .fold(q!(|| 0), q!(|acc, v| *acc += v))
3482 .snapshot_atomic(&tick, nondet!(/** test */)),
3483 )
3484 .all_ticks()
3485 .send_bincode_external(&external);
3486
3487 let nodes = flow
3488 .with_process(&node, deployment.Localhost())
3489 .with_external(&external, deployment.Localhost())
3490 .deploy(&mut deployment);
3491
3492 deployment.deploy().await.unwrap();
3493
3494 let mut external_in = nodes.connect(input_port).await;
3495 let mut external_out = nodes.connect(out).await;
3496
3497 deployment.start().await.unwrap();
3498
3499 external_in.send(1).await.unwrap();
3500 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3501
3502 external_in.send(2).await.unwrap();
3503 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3504 }
3505
3506 #[cfg(feature = "deploy")]
3507 #[tokio::test]
3508 async fn unbounded_scan_remembers_state() {
3509 let mut deployment = Deployment::new();
3510
3511 let mut flow = FlowBuilder::new();
3512 let node = flow.process::<()>();
3513 let external = flow.external::<()>();
3514
3515 let (input_port, input) = node.source_external_bincode(&external);
3516 let out = input
3517 .scan(
3518 q!(|| 0),
3519 q!(|acc, v| {
3520 *acc += v;
3521 Some(*acc)
3522 }),
3523 )
3524 .send_bincode_external(&external);
3525
3526 let nodes = flow
3527 .with_process(&node, deployment.Localhost())
3528 .with_external(&external, deployment.Localhost())
3529 .deploy(&mut deployment);
3530
3531 deployment.deploy().await.unwrap();
3532
3533 let mut external_in = nodes.connect(input_port).await;
3534 let mut external_out = nodes.connect(out).await;
3535
3536 deployment.start().await.unwrap();
3537
3538 external_in.send(1).await.unwrap();
3539 assert_eq!(external_out.next().await.unwrap(), 1);
3540
3541 external_in.send(2).await.unwrap();
3542 assert_eq!(external_out.next().await.unwrap(), 3);
3543 }
3544
3545 #[cfg(feature = "deploy")]
3546 #[tokio::test]
3547 async fn unbounded_enumerate_remembers_state() {
3548 let mut deployment = Deployment::new();
3549
3550 let mut flow = FlowBuilder::new();
3551 let node = flow.process::<()>();
3552 let external = flow.external::<()>();
3553
3554 let (input_port, input) = node.source_external_bincode(&external);
3555 let out = input.enumerate().send_bincode_external(&external);
3556
3557 let nodes = flow
3558 .with_process(&node, deployment.Localhost())
3559 .with_external(&external, deployment.Localhost())
3560 .deploy(&mut deployment);
3561
3562 deployment.deploy().await.unwrap();
3563
3564 let mut external_in = nodes.connect(input_port).await;
3565 let mut external_out = nodes.connect(out).await;
3566
3567 deployment.start().await.unwrap();
3568
3569 external_in.send(1).await.unwrap();
3570 assert_eq!(external_out.next().await.unwrap(), (0, 1));
3571
3572 external_in.send(2).await.unwrap();
3573 assert_eq!(external_out.next().await.unwrap(), (1, 2));
3574 }
3575
3576 #[cfg(feature = "deploy")]
3577 #[tokio::test]
3578 async fn unbounded_unique_remembers_state() {
3579 let mut deployment = Deployment::new();
3580
3581 let mut flow = FlowBuilder::new();
3582 let node = flow.process::<()>();
3583 let external = flow.external::<()>();
3584
3585 let (input_port, input) =
3586 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3587 let out = input.unique().send_bincode_external(&external);
3588
3589 let nodes = flow
3590 .with_process(&node, deployment.Localhost())
3591 .with_external(&external, deployment.Localhost())
3592 .deploy(&mut deployment);
3593
3594 deployment.deploy().await.unwrap();
3595
3596 let mut external_in = nodes.connect(input_port).await;
3597 let mut external_out = nodes.connect(out).await;
3598
3599 deployment.start().await.unwrap();
3600
3601 external_in.send(1).await.unwrap();
3602 assert_eq!(external_out.next().await.unwrap(), 1);
3603
3604 external_in.send(2).await.unwrap();
3605 assert_eq!(external_out.next().await.unwrap(), 2);
3606
3607 external_in.send(1).await.unwrap();
3608 external_in.send(3).await.unwrap();
3609 assert_eq!(external_out.next().await.unwrap(), 3);
3610 }
3611
3612 #[cfg(feature = "sim")]
3613 #[test]
3614 #[should_panic]
3615 fn sim_batch_nondet_size() {
3616 let mut flow = FlowBuilder::new();
3617 let node = flow.process::<()>();
3618
3619 let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3620
3621 let tick = node.tick();
3622 let out_recv = input
3623 .batch(&tick, nondet!(/** test */))
3624 .count()
3625 .all_ticks()
3626 .sim_output();
3627
3628 flow.sim().exhaustive(async || {
3629 in_send.send(());
3630 in_send.send(());
3631 in_send.send(());
3632
3633 assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3634 });
3635 }
3636
3637 #[cfg(feature = "sim")]
3638 #[test]
3639 fn sim_batch_preserves_order() {
3640 let mut flow = FlowBuilder::new();
3641 let node = flow.process::<()>();
3642
3643 let (in_send, input) = node.sim_input();
3644
3645 let tick = node.tick();
3646 let out_recv = input
3647 .batch(&tick, nondet!(/** test */))
3648 .all_ticks()
3649 .sim_output();
3650
3651 flow.sim().exhaustive(async || {
3652 in_send.send(1);
3653 in_send.send(2);
3654 in_send.send(3);
3655
3656 out_recv.assert_yields_only([1, 2, 3]).await;
3657 });
3658 }
3659
3660 #[cfg(feature = "sim")]
3661 #[test]
3662 #[should_panic]
3663 fn sim_batch_unordered_shuffles() {
3664 let mut flow = FlowBuilder::new();
3665 let node = flow.process::<()>();
3666
3667 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3668
3669 let tick = node.tick();
3670 let batch = input.batch(&tick, nondet!(/** test */));
3671 let out_recv = batch
3672 .clone()
3673 .min()
3674 .zip(batch.max())
3675 .all_ticks()
3676 .sim_output();
3677
3678 flow.sim().exhaustive(async || {
3679 in_send.send_many_unordered([1, 2, 3]);
3680
3681 if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3682 panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3683 }
3684 });
3685 }
3686
3687 #[cfg(feature = "sim")]
3688 #[test]
3689 fn sim_batch_unordered_shuffles_count() {
3690 let mut flow = FlowBuilder::new();
3691 let node = flow.process::<()>();
3692
3693 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3694
3695 let tick = node.tick();
3696 let batch = input.batch(&tick, nondet!(/** test */));
3697 let out_recv = batch.all_ticks().sim_output();
3698
3699 let instance_count = flow.sim().exhaustive(async || {
3700 in_send.send_many_unordered([1, 2, 3, 4]);
3701 out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3702 });
3703
3704 assert_eq!(
3705 instance_count,
3706 75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3707 )
3708 }
3709
3710 #[cfg(feature = "sim")]
3711 #[test]
3712 #[should_panic]
3713 fn sim_observe_order_batched() {
3714 let mut flow = FlowBuilder::new();
3715 let node = flow.process::<()>();
3716
3717 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3718
3719 let tick = node.tick();
3720 let batch = input.batch(&tick, nondet!(/** test */));
3721 let out_recv = batch
3722 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3723 .all_ticks()
3724 .sim_output();
3725
3726 flow.sim().exhaustive(async || {
3727 in_send.send_many_unordered([1, 2, 3, 4]);
3728 out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3729 });
3730 }
3731
3732 #[cfg(feature = "sim")]
3733 #[test]
3734 fn sim_observe_order_batched_count() {
3735 let mut flow = FlowBuilder::new();
3736 let node = flow.process::<()>();
3737
3738 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3739
3740 let tick = node.tick();
3741 let batch = input.batch(&tick, nondet!(/** test */));
3742 let out_recv = batch
3743 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3744 .all_ticks()
3745 .sim_output();
3746
3747 let instance_count = flow.sim().exhaustive(async || {
3748 in_send.send_many_unordered([1, 2, 3, 4]);
3749 let _ = out_recv.collect::<Vec<_>>().await;
3750 });
3751
3752 assert_eq!(
3753 instance_count,
3754 192 // 4! * 2^{4 - 1}
3755 )
3756 }
3757
3758 #[cfg(feature = "sim")]
3759 #[test]
3760 fn sim_unordered_count_instance_count() {
3761 let mut flow = FlowBuilder::new();
3762 let node = flow.process::<()>();
3763
3764 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3765
3766 let tick = node.tick();
3767 let out_recv = input
3768 .count()
3769 .snapshot(&tick, nondet!(/** test */))
3770 .all_ticks()
3771 .sim_output();
3772
3773 let instance_count = flow.sim().exhaustive(async || {
3774 in_send.send_many_unordered([1, 2, 3, 4]);
3775 assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3776 });
3777
3778 assert_eq!(
3779 instance_count,
3780 16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3781 )
3782 }
3783
3784 #[cfg(feature = "sim")]
3785 #[test]
3786 fn sim_top_level_assume_ordering() {
3787 let mut flow = FlowBuilder::new();
3788 let node = flow.process::<()>();
3789
3790 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3791
3792 let out_recv = input
3793 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3794 .sim_output();
3795
3796 let instance_count = flow.sim().exhaustive(async || {
3797 in_send.send_many_unordered([1, 2, 3]);
3798 let mut out = out_recv.collect::<Vec<_>>().await;
3799 out.sort();
3800 assert_eq!(out, vec![1, 2, 3]);
3801 });
3802
3803 assert_eq!(instance_count, 6)
3804 }
3805
3806 #[cfg(feature = "sim")]
3807 #[test]
3808 fn sim_top_level_assume_ordering_cycle_back() {
3809 let mut flow = FlowBuilder::new();
3810 let node = flow.process::<()>();
3811 let node2 = flow.process::<()>();
3812
3813 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3814
3815 let (complete_cycle_back, cycle_back) =
3816 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3817 let ordered = input
3818 .merge_unordered(cycle_back)
3819 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3820 complete_cycle_back.complete(
3821 ordered
3822 .clone()
3823 .map(q!(|v| v + 1))
3824 .filter(q!(|v| v % 2 == 1))
3825 .send(&node2, TCP.fail_stop().bincode())
3826 .send(&node, TCP.fail_stop().bincode()),
3827 );
3828
3829 let out_recv = ordered.sim_output();
3830
3831 let mut saw = false;
3832 let instance_count = flow.sim().exhaustive(async || {
3833 in_send.send_many_unordered([0, 2]);
3834 let out = out_recv.collect::<Vec<_>>().await;
3835
3836 if out.starts_with(&[0, 1, 2]) {
3837 saw = true;
3838 }
3839 });
3840
3841 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3842 assert_eq!(instance_count, 6);
3843 }
3844
3845 #[cfg(feature = "sim")]
3846 #[test]
3847 fn sim_top_level_assume_ordering_cycle_back_tick() {
3848 let mut flow = FlowBuilder::new();
3849 let node = flow.process::<()>();
3850 let node2 = flow.process::<()>();
3851
3852 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3853
3854 let (complete_cycle_back, cycle_back) =
3855 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3856 let ordered = input
3857 .merge_unordered(cycle_back)
3858 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3859 complete_cycle_back.complete(
3860 ordered
3861 .clone()
3862 .batch(&node.tick(), nondet!(/** test */))
3863 .all_ticks()
3864 .map(q!(|v| v + 1))
3865 .filter(q!(|v| v % 2 == 1))
3866 .send(&node2, TCP.fail_stop().bincode())
3867 .send(&node, TCP.fail_stop().bincode()),
3868 );
3869
3870 let out_recv = ordered.sim_output();
3871
3872 let mut saw = false;
3873 let instance_count = flow.sim().exhaustive(async || {
3874 in_send.send_many_unordered([0, 2]);
3875 let out = out_recv.collect::<Vec<_>>().await;
3876
3877 if out.starts_with(&[0, 1, 2]) {
3878 saw = true;
3879 }
3880 });
3881
3882 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3883 assert_eq!(instance_count, 58);
3884 }
3885
3886 #[cfg(feature = "sim")]
3887 #[test]
3888 fn sim_top_level_assume_ordering_multiple() {
3889 let mut flow = FlowBuilder::new();
3890 let node = flow.process::<()>();
3891 let node2 = flow.process::<()>();
3892
3893 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3894 let (_, input2) = node.sim_input::<_, NoOrder, _>();
3895
3896 let (complete_cycle_back, cycle_back) =
3897 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3898 let input1_ordered = input
3899 .clone()
3900 .merge_unordered(cycle_back)
3901 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3902 let foo = input1_ordered
3903 .clone()
3904 .map(q!(|v| v + 3))
3905 .weaken_ordering::<NoOrder>()
3906 .merge_unordered(input2)
3907 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3908
3909 complete_cycle_back.complete(
3910 foo.filter(q!(|v| *v == 3))
3911 .send(&node2, TCP.fail_stop().bincode())
3912 .send(&node, TCP.fail_stop().bincode()),
3913 );
3914
3915 let out_recv = input1_ordered.sim_output();
3916
3917 let mut saw = false;
3918 let instance_count = flow.sim().exhaustive(async || {
3919 in_send.send_many_unordered([0, 1]);
3920 let out = out_recv.collect::<Vec<_>>().await;
3921
3922 if out.starts_with(&[0, 3, 1]) {
3923 saw = true;
3924 }
3925 });
3926
3927 assert!(saw, "did not see an instance with 0, 3, 1 in order");
3928 assert_eq!(instance_count, 24);
3929 }
3930
3931 #[cfg(feature = "sim")]
3932 #[test]
3933 fn sim_atomic_assume_ordering_cycle_back() {
3934 let mut flow = FlowBuilder::new();
3935 let node = flow.process::<()>();
3936 let node2 = flow.process::<()>();
3937
3938 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3939
3940 let (complete_cycle_back, cycle_back) =
3941 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3942 let ordered = input
3943 .merge_unordered(cycle_back)
3944 .atomic()
3945 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3946 .end_atomic();
3947 complete_cycle_back.complete(
3948 ordered
3949 .clone()
3950 .map(q!(|v| v + 1))
3951 .filter(q!(|v| v % 2 == 1))
3952 .send(&node2, TCP.fail_stop().bincode())
3953 .send(&node, TCP.fail_stop().bincode()),
3954 );
3955
3956 let out_recv = ordered.sim_output();
3957
3958 let instance_count = flow.sim().exhaustive(async || {
3959 in_send.send_many_unordered([0, 2]);
3960 let out = out_recv.collect::<Vec<_>>().await;
3961 assert_eq!(out.len(), 4);
3962 });
3963 assert_eq!(instance_count, 22);
3964 }
3965
3966 #[cfg(feature = "deploy")]
3967 #[tokio::test]
3968 async fn partition_evens_odds() {
3969 let mut deployment = Deployment::new();
3970
3971 let mut flow = FlowBuilder::new();
3972 let node = flow.process::<()>();
3973 let external = flow.external::<()>();
3974
3975 let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
3976 let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
3977 let evens_port = evens.send_bincode_external(&external);
3978 let odds_port = odds.send_bincode_external(&external);
3979
3980 let nodes = flow
3981 .with_process(&node, deployment.Localhost())
3982 .with_external(&external, deployment.Localhost())
3983 .deploy(&mut deployment);
3984
3985 deployment.deploy().await.unwrap();
3986
3987 let mut evens_out = nodes.connect(evens_port).await;
3988 let mut odds_out = nodes.connect(odds_port).await;
3989
3990 deployment.start().await.unwrap();
3991
3992 let mut even_results = Vec::new();
3993 for _ in 0..3 {
3994 even_results.push(evens_out.next().await.unwrap());
3995 }
3996 even_results.sort();
3997 assert_eq!(even_results, vec![2, 4, 6]);
3998
3999 let mut odd_results = Vec::new();
4000 for _ in 0..3 {
4001 odd_results.push(odds_out.next().await.unwrap());
4002 }
4003 odd_results.sort();
4004 assert_eq!(odd_results, vec![1, 3, 5]);
4005 }
4006
4007 #[cfg(feature = "deploy")]
4008 #[tokio::test]
4009 async fn unconsumed_inspect_still_runs() {
4010 use crate::deploy::DeployCrateWrapper;
4011
4012 let mut deployment = Deployment::new();
4013
4014 let mut flow = FlowBuilder::new();
4015 let node = flow.process::<()>();
4016
4017 // The return value of .inspect() is intentionally dropped.
4018 // Before the Null-root fix, this would silently do nothing.
4019 node.source_iter(q!(0..5))
4020 .inspect(q!(|x| println!("inspect: {}", x)));
4021
4022 let nodes = flow
4023 .with_process(&node, deployment.Localhost())
4024 .deploy(&mut deployment);
4025
4026 deployment.deploy().await.unwrap();
4027
4028 let mut stdout = nodes.get_process(&node).stdout();
4029
4030 deployment.start().await.unwrap();
4031
4032 let mut lines = Vec::new();
4033 for _ in 0..5 {
4034 lines.push(stdout.recv().await.unwrap());
4035 }
4036 lines.sort();
4037 assert_eq!(
4038 lines,
4039 vec![
4040 "inspect: 0",
4041 "inspect: 1",
4042 "inspect: 2",
4043 "inspect: 3",
4044 "inspect: 4",
4045 ]
4046 );
4047 }
4048
4049 #[cfg(feature = "sim")]
4050 #[test]
4051 fn sim_limit() {
4052 let mut flow = FlowBuilder::new();
4053 let node = flow.process::<()>();
4054
4055 let (in_send, input) = node.sim_input();
4056
4057 let out_recv = input.limit(q!(3)).sim_output();
4058
4059 flow.sim().exhaustive(async || {
4060 in_send.send(1);
4061 in_send.send(2);
4062 in_send.send(3);
4063 in_send.send(4);
4064 in_send.send(5);
4065
4066 out_recv.assert_yields_only([1, 2, 3]).await;
4067 });
4068 }
4069
4070 #[cfg(feature = "sim")]
4071 #[test]
4072 fn sim_limit_zero() {
4073 let mut flow = FlowBuilder::new();
4074 let node = flow.process::<()>();
4075
4076 let (in_send, input) = node.sim_input();
4077
4078 let out_recv = input.limit(q!(0)).sim_output();
4079
4080 flow.sim().exhaustive(async || {
4081 in_send.send(1);
4082 in_send.send(2);
4083
4084 out_recv.assert_yields_only::<i32, _>([]).await;
4085 });
4086 }
4087
4088 #[cfg(feature = "sim")]
4089 #[test]
4090 fn sim_merge_ordered() {
4091 let mut flow = FlowBuilder::new();
4092 let node = flow.process::<()>();
4093
4094 let (in_send, input) = node.sim_input();
4095 let (in_send2, input2) = node.sim_input();
4096
4097 let out_recv = input
4098 .merge_ordered(input2, nondet!(/** test */))
4099 .sim_output();
4100
4101 let mut saw_out_of_order = false;
4102 let instances = flow.sim().exhaustive(async || {
4103 in_send.send(1);
4104 in_send.send(2);
4105 in_send2.send(3);
4106 in_send2.send(4);
4107
4108 let out = out_recv.collect::<Vec<_>>().await;
4109
4110 if out == [1, 3, 2, 4] {
4111 saw_out_of_order = true;
4112 }
4113
4114 // Assert ordering preservation: elements from each input must
4115 // appear in their original relative order.
4116 let mut first_elements = out.iter().filter(|v| **v <= 2).copied().collect::<Vec<_>>();
4117 let mut second_elements = out.iter().filter(|v| **v > 2).copied().collect::<Vec<_>>();
4118 assert_eq!(
4119 first_elements,
4120 vec![1, 2],
4121 "first input order violated: {:?}",
4122 out
4123 );
4124 assert_eq!(
4125 second_elements,
4126 vec![3, 4],
4127 "second input order violated: {:?}",
4128 out
4129 );
4130
4131 first_elements.append(&mut second_elements);
4132 first_elements.sort();
4133 assert_eq!(first_elements, vec![1, 2, 3, 4]);
4134 });
4135
4136 assert!(saw_out_of_order);
4137 assert_eq!(instances, 6);
4138 }
4139
4140 /// Tests that merge_ordered passes through elements when only one input
4141 /// has data.
4142 #[cfg(feature = "sim")]
4143 #[test]
4144 fn sim_merge_ordered_one_empty() {
4145 let mut flow = FlowBuilder::new();
4146 let node = flow.process::<()>();
4147
4148 let (in_send, input) = node.sim_input();
4149 let (_in_send2, input2) = node.sim_input();
4150
4151 let out_recv = input
4152 .merge_ordered(input2, nondet!(/** test */))
4153 .sim_output();
4154
4155 let instances = flow.sim().exhaustive(async || {
4156 in_send.send(1);
4157 in_send.send(2);
4158
4159 let out = out_recv.collect::<Vec<_>>().await;
4160 assert_eq!(out, vec![1, 2]);
4161 });
4162
4163 // Only one possible interleaving when one input is empty
4164 assert_eq!(instances, 1);
4165 }
4166
4167 /// Tests that merge_ordered correctly handles feedback cycles.
4168 /// An element output from merge_ordered is filtered and cycled back to
4169 /// one of its inputs. The one-at-a-time release must allow the cycled-back
4170 /// element to arrive and potentially be emitted before elements still
4171 /// waiting on the other input.
4172 #[cfg(feature = "sim")]
4173 #[test]
4174 fn sim_merge_ordered_cycle_back() {
4175 let mut flow = FlowBuilder::new();
4176 let node = flow.process::<()>();
4177
4178 let (in_send, input) = node.sim_input();
4179
4180 // Create a forward ref for the cycle back
4181 let (complete_cycle_back, cycle_back) =
4182 node.forward_ref::<super::Stream<_, _, _, TotalOrder>>();
4183
4184 // merge_ordered: input (external) with cycle_back
4185 let merged = input.merge_ordered(cycle_back, nondet!(/** test */));
4186
4187 // Cycle back: elements equal to 1 get mapped to 10 and fed back
4188 complete_cycle_back.complete(merged.clone().filter(q!(|v| *v == 1)).map(q!(|v| v * 10)));
4189
4190 let out_recv = merged.sim_output();
4191
4192 // Send 1 and 2. Element 1 should cycle back as 10.
4193 // Valid orderings must have 1 before 10 (since 10 depends on 1).
4194 let mut saw_cycle_before_second = false;
4195 flow.sim().exhaustive(async || {
4196 in_send.send(1);
4197 in_send.send(2);
4198
4199 let out = out_recv.collect::<Vec<_>>().await;
4200
4201 // 10 must always come after 1 (causal dependency)
4202 let pos_1 = out.iter().position(|v| *v == 1).unwrap();
4203 let pos_10 = out.iter().position(|v| *v == 10).unwrap();
4204 assert!(pos_1 < pos_10, "causal order violated: {:?}", out);
4205
4206 // Check if we see [1, 10, 2] — the cycled element beats the second input
4207 if out == [1, 10, 2] {
4208 saw_cycle_before_second = true;
4209 }
4210
4211 let mut sorted = out;
4212 sorted.sort();
4213 assert_eq!(sorted, vec![1, 2, 10]);
4214 });
4215
4216 assert!(
4217 saw_cycle_before_second,
4218 "never saw the cycled element arrive before the second input element"
4219 );
4220 }
4221
4222 /// Tests that merge_ordered correctly interleaves when one input has a
4223 /// delayed element. With a: [1, _delay_, 2] and b: [3, 4], the delayed
4224 /// element 2 should be able to appear after b's elements.
4225 #[cfg(feature = "sim")]
4226 #[test]
4227 fn sim_merge_ordered_delayed() {
4228 let mut flow = FlowBuilder::new();
4229 let node = flow.process::<()>();
4230
4231 let (in_send, input) = node.sim_input();
4232 let (in_send2, input2) = node.sim_input();
4233
4234 let out_recv = input
4235 .merge_ordered(input2, nondet!(/** test */))
4236 .sim_output();
4237
4238 let mut saw_delayed_interleaving = false;
4239 flow.sim().exhaustive(async || {
4240 // Send 1 from a, and 3, 4 from b
4241 in_send.send(1);
4242 in_send2.send(3);
4243 in_send2.send(4);
4244
4245 // Collect what's available so far
4246 let first_batch = out_recv.collect::<Vec<_>>().await;
4247
4248 // Now send the delayed element 2 from a
4249 in_send.send(2);
4250 let second_batch = out_recv.collect::<Vec<_>>().await;
4251
4252 let mut all: Vec<_> = first_batch
4253 .iter()
4254 .chain(second_batch.iter())
4255 .copied()
4256 .collect();
4257
4258 // Check if we saw [1, 3, 4, 2] — the delayed interleaving
4259 if all == [1, 3, 4, 2] {
4260 saw_delayed_interleaving = true;
4261 }
4262
4263 all.sort();
4264 assert_eq!(all, vec![1, 2, 3, 4]);
4265 });
4266
4267 assert!(saw_delayed_interleaving);
4268 }
4269
4270 /// Deploy test: merge_ordered with a delayed element on one input.
4271 /// Sends a=1, b=3, b=4, then after receiving those, sends a=2.
4272 /// Expects to see [1, 3, 4] first, then [2] — demonstrating that
4273 /// both inputs are pulled and the delayed element arrives later.
4274 #[cfg(feature = "deploy")]
4275 #[tokio::test]
4276 async fn deploy_merge_ordered_delayed() {
4277 let mut deployment = Deployment::new();
4278
4279 let mut flow = FlowBuilder::new();
4280 let node = flow.process::<()>();
4281 let external = flow.external::<()>();
4282
4283 let (input_a_port, input_a) = node.source_external_bincode(&external);
4284 let (input_b_port, input_b) = node.source_external_bincode(&external);
4285
4286 let out = input_a
4287 .assume_ordering(nondet!(/** test */))
4288 .merge_ordered(
4289 input_b.assume_ordering(nondet!(/** test */)),
4290 nondet!(/** test */),
4291 )
4292 .send_bincode_external(&external);
4293
4294 let nodes = flow
4295 .with_process(&node, deployment.Localhost())
4296 .with_external(&external, deployment.Localhost())
4297 .deploy(&mut deployment);
4298
4299 deployment.deploy().await.unwrap();
4300
4301 let mut ext_a = nodes.connect(input_a_port).await;
4302 let mut ext_b = nodes.connect(input_b_port).await;
4303 let mut ext_out = nodes.connect(out).await;
4304
4305 deployment.start().await.unwrap();
4306
4307 // Send a=1, b=3, b=4
4308 ext_a.send(1).await.unwrap();
4309 ext_b.send(3).await.unwrap();
4310 ext_b.send(4).await.unwrap();
4311
4312 // Collect the first 3 elements
4313 let mut received = Vec::new();
4314 for _ in 0..3 {
4315 received.push(ext_out.next().await.unwrap());
4316 }
4317
4318 // Now send the delayed a=2
4319 ext_a.send(2).await.unwrap();
4320 received.push(ext_out.next().await.unwrap());
4321
4322 // All elements should be present
4323 received.sort();
4324 assert_eq!(received, vec![1, 2, 3, 4]);
4325 }
4326
4327 #[cfg(feature = "deploy")]
4328 #[tokio::test]
4329 async fn monotone_fold_threshold() {
4330 use crate::properties::manual_proof;
4331
4332 let mut deployment = Deployment::new();
4333
4334 let mut flow = FlowBuilder::new();
4335 let node = flow.process::<()>();
4336 let external = flow.external::<()>();
4337
4338 let in_unbounded: super::Stream<_, _> =
4339 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4340 let sum = in_unbounded.fold(
4341 q!(|| 0),
4342 q!(
4343 |sum, v| {
4344 *sum += v;
4345 },
4346 monotone = manual_proof!(/** test */)
4347 ),
4348 );
4349
4350 let threshold_out = sum
4351 .threshold_greater_or_equal(node.singleton(q!(7)))
4352 .send_bincode_external(&external);
4353
4354 let nodes = flow
4355 .with_process(&node, deployment.Localhost())
4356 .with_external(&external, deployment.Localhost())
4357 .deploy(&mut deployment);
4358
4359 deployment.deploy().await.unwrap();
4360
4361 let mut threshold_out = nodes.connect(threshold_out).await;
4362
4363 deployment.start().await.unwrap();
4364
4365 assert_eq!(threshold_out.next().await.unwrap(), 7);
4366 }
4367
4368 #[cfg(feature = "deploy")]
4369 #[tokio::test]
4370 async fn monotone_count_threshold() {
4371 let mut deployment = Deployment::new();
4372
4373 let mut flow = FlowBuilder::new();
4374 let node = flow.process::<()>();
4375 let external = flow.external::<()>();
4376
4377 let in_unbounded: super::Stream<_, _> =
4378 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4379 let sum = in_unbounded.count();
4380
4381 let threshold_out = sum
4382 .threshold_greater_or_equal(node.singleton(q!(3)))
4383 .send_bincode_external(&external);
4384
4385 let nodes = flow
4386 .with_process(&node, deployment.Localhost())
4387 .with_external(&external, deployment.Localhost())
4388 .deploy(&mut deployment);
4389
4390 deployment.deploy().await.unwrap();
4391
4392 let mut threshold_out = nodes.connect(threshold_out).await;
4393
4394 deployment.start().await.unwrap();
4395
4396 assert_eq!(threshold_out.next().await.unwrap(), 3);
4397 }
4398
4399 #[cfg(feature = "deploy")]
4400 #[tokio::test]
4401 async fn monotone_map_order_preserving_threshold() {
4402 use crate::properties::manual_proof;
4403
4404 let mut deployment = Deployment::new();
4405
4406 let mut flow = FlowBuilder::new();
4407 let node = flow.process::<()>();
4408 let external = flow.external::<()>();
4409
4410 let in_unbounded: super::Stream<_, _> =
4411 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4412 let sum = in_unbounded.fold(
4413 q!(|| 0),
4414 q!(
4415 |sum, v| {
4416 *sum += v;
4417 },
4418 monotone = manual_proof!(/** test */)
4419 ),
4420 );
4421
4422 // map with order_preserving should preserve monotonicity
4423 let doubled = sum.map(q!(
4424 |v| v * 2,
4425 order_preserving = manual_proof!(/** doubling preserves order */)
4426 ));
4427
4428 let threshold_out = doubled
4429 .threshold_greater_or_equal(node.singleton(q!(14)))
4430 .send_bincode_external(&external);
4431
4432 let nodes = flow
4433 .with_process(&node, deployment.Localhost())
4434 .with_external(&external, deployment.Localhost())
4435 .deploy(&mut deployment);
4436
4437 deployment.deploy().await.unwrap();
4438
4439 let mut threshold_out = nodes.connect(threshold_out).await;
4440
4441 deployment.start().await.unwrap();
4442
4443 assert_eq!(threshold_out.next().await.unwrap(), 14);
4444 }
4445
4446 // === Compile-time type tests for join/cross_product ordering ===
4447
4448 #[cfg(any(feature = "deploy", feature = "sim"))]
4449 mod join_ordering_type_tests {
4450 use crate::live_collections::boundedness::{Bounded, Unbounded};
4451 use crate::live_collections::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
4452 use crate::location::{Location, Process};
4453
4454 #[expect(dead_code, reason = "compile-time type test")]
4455 fn join_unbounded_with_bounded_preserves_order<'a>(
4456 left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4457 right: Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4458 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4459 left.join(right)
4460 }
4461
4462 #[expect(dead_code, reason = "compile-time type test")]
4463 fn join_unbounded_with_unbounded_is_no_order<'a>(
4464 left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4465 right: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4466 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4467 left.join(right)
4468 }
4469
4470 #[expect(dead_code, reason = "compile-time type test")]
4471 fn join_bounded_with_bounded_preserves_order<'a, L: Location<'a>>(
4472 left: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4473 right: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4474 ) -> Stream<(i32, (char, char)), L, Bounded, TotalOrder, ExactlyOnce> {
4475 left.join(right)
4476 }
4477
4478 #[expect(dead_code, reason = "compile-time type test")]
4479 fn join_unbounded_noorder_with_bounded<'a>(
4480 left: Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce>,
4481 right: Stream<(i32, char), Process<'a>, Bounded, NoOrder, ExactlyOnce>,
4482 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4483 left.join(right)
4484 }
4485
4486 // === Compile-time type tests for cross_product ordering ===
4487
4488 #[expect(dead_code, reason = "compile-time type test")]
4489 fn cross_product_unbounded_with_bounded_preserves_order<'a>(
4490 left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4491 right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4492 ) -> Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4493 left.cross_product(right)
4494 }
4495
4496 #[expect(dead_code, reason = "compile-time type test")]
4497 fn cross_product_bounded_with_bounded_preserves_order<'a>(
4498 left: Stream<i32, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4499 right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4500 ) -> Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce> {
4501 left.cross_product(right)
4502 }
4503
4504 #[expect(dead_code, reason = "compile-time type test")]
4505 fn cross_product_unbounded_with_unbounded_is_no_order<'a>(
4506 left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4507 right: Stream<char, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4508 ) -> Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4509 left.cross_product(right)
4510 }
4511 } // mod join_ordering_type_tests
4512
4513 // === Runtime correctness tests for bounded join/cross_product ===
4514
4515 #[cfg(feature = "sim")]
4516 #[test]
4517 fn cross_product_mixed_boundedness_correctness() {
4518 use stageleft::q;
4519
4520 use crate::compile::builder::FlowBuilder;
4521 use crate::nondet::nondet;
4522
4523 let mut flow = FlowBuilder::new();
4524 let process = flow.process::<()>();
4525 let tick = process.tick();
4526
4527 let left = process.source_iter(q!(vec![1, 2]));
4528 let right = process
4529 .source_iter(q!(vec!['a', 'b']))
4530 .batch(&tick, nondet!(/** test */))
4531 .all_ticks();
4532
4533 let out = left.cross_product(right).sim_output();
4534
4535 flow.sim().exhaustive(async || {
4536 out.assert_yields_only_unordered(vec![(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')])
4537 .await;
4538 });
4539 }
4540
4541 #[cfg(feature = "sim")]
4542 #[test]
4543 fn join_mixed_boundedness_correctness() {
4544 use stageleft::q;
4545
4546 use crate::compile::builder::FlowBuilder;
4547 use crate::nondet::nondet;
4548
4549 let mut flow = FlowBuilder::new();
4550 let process = flow.process::<()>();
4551 let tick = process.tick();
4552
4553 let left = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
4554 let right = process
4555 .source_iter(q!(vec![(1, 'x'), (2, 'y')]))
4556 .batch(&tick, nondet!(/** test */))
4557 .all_ticks();
4558
4559 let out = left.join(right).sim_output();
4560
4561 flow.sim().exhaustive(async || {
4562 out.assert_yields_only_unordered(vec![(1, ('a', 'x')), (2, ('b', 'y'))])
4563 .await;
4564 });
4565 }
4566}