hydro_lang/live_collections/stream/mod.rs
1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::{Generate, KeyedStream};
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::singleton::SingletonBound;
27#[cfg(stageleft_runtime)]
28use crate::location::dynamic::{DynLocation, LocationId};
29use crate::location::tick::{Atomic, DeferTick, NoAtomic};
30use crate::location::{Location, NoTick, Tick, check_matching_location};
31use crate::manual_expr::ManualExpr;
32use crate::nondet::{NonDet, nondet};
33use crate::prelude::manual_proof;
34use crate::properties::{
35 AggFuncAlgebra, ApplyMonotoneStream, ValidCommutativityFor, ValidIdempotenceFor,
36};
37
38pub mod networking;
39
40/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
41#[sealed::sealed]
42pub trait Ordering:
43 MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
44{
45 /// The [`StreamOrder`] corresponding to this type.
46 const ORDERING_KIND: StreamOrder;
47}
48
49/// Marks the stream as being totally ordered, which means that there are
50/// no sources of non-determinism (other than intentional ones) that will
51/// affect the order of elements.
52pub enum TotalOrder {}
53
54#[sealed::sealed]
55impl Ordering for TotalOrder {
56 const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
57}
58
59/// Marks the stream as having no order, which means that the order of
60/// elements may be affected by non-determinism.
61///
62/// This restricts certain operators, such as `fold` and `reduce`, to only
63/// be used with commutative aggregation functions.
64pub enum NoOrder {}
65
66#[sealed::sealed]
67impl Ordering for NoOrder {
68 const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
69}
70
71/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
72/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
73/// have `Self` guarantees instead.
74#[sealed::sealed]
75pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
76#[sealed::sealed]
77impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
78
79/// Helper trait for determining the weakest of two orderings.
80#[sealed::sealed]
81pub trait MinOrder<Other: ?Sized> {
82 /// The weaker of the two orderings.
83 type Min: Ordering;
84}
85
86#[sealed::sealed]
87impl<O: Ordering> MinOrder<O> for TotalOrder {
88 type Min = O;
89}
90
91#[sealed::sealed]
92impl<O: Ordering> MinOrder<O> for NoOrder {
93 type Min = NoOrder;
94}
95
96/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
97#[sealed::sealed]
98pub trait Retries:
99 MinRetries<Self, Min = Self>
100 + MinRetries<ExactlyOnce, Min = Self>
101 + MinRetries<AtLeastOnce, Min = AtLeastOnce>
102{
103 /// The [`StreamRetry`] corresponding to this type.
104 const RETRIES_KIND: StreamRetry;
105}
106
107/// Marks the stream as having deterministic message cardinality, with no
108/// possibility of duplicates.
109pub enum ExactlyOnce {}
110
111#[sealed::sealed]
112impl Retries for ExactlyOnce {
113 const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
114}
115
116/// Marks the stream as having non-deterministic message cardinality, which
117/// means that duplicates may occur, but messages will not be dropped.
118pub enum AtLeastOnce {}
119
120#[sealed::sealed]
121impl Retries for AtLeastOnce {
122 const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
123}
124
125/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
126/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
127/// have `Self` guarantees instead.
128#[sealed::sealed]
129pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
130#[sealed::sealed]
131impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
132
133/// Helper trait for determining the weakest of two retry guarantees.
134#[sealed::sealed]
135pub trait MinRetries<Other: ?Sized> {
136 /// The weaker of the two retry guarantees.
137 type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
138}
139
140#[sealed::sealed]
141impl<R: Retries> MinRetries<R> for ExactlyOnce {
142 type Min = R;
143}
144
145#[sealed::sealed]
146impl<R: Retries> MinRetries<R> for AtLeastOnce {
147 type Min = AtLeastOnce;
148}
149
150#[sealed::sealed]
151#[diagnostic::on_unimplemented(
152 message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
153 label = "required here",
154 note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
155)]
156/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
157pub trait IsOrdered: Ordering {}
158
159#[sealed::sealed]
160#[diagnostic::do_not_recommend]
161impl IsOrdered for TotalOrder {}
162
163#[sealed::sealed]
164#[diagnostic::on_unimplemented(
165 message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
166 label = "required here",
167 note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
168)]
169/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
170pub trait IsExactlyOnce: Retries {}
171
172#[sealed::sealed]
173#[diagnostic::do_not_recommend]
174impl IsExactlyOnce for ExactlyOnce {}
175
176/// Streaming sequence of elements with type `Type`.
177///
178/// This live collection represents a growing sequence of elements, with new elements being
179/// asynchronously appended to the end of the sequence. This can be used to model the arrival
180/// of network input, such as API requests, or streaming ingestion.
181///
182/// By default, all streams have deterministic ordering and each element is materialized exactly
183/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
184/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
185/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
186///
187/// Type Parameters:
188/// - `Type`: the type of elements in the stream
189/// - `Loc`: the location where the stream is being materialized
190/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
191/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
192/// (default is [`TotalOrder`])
193/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
194/// [`AtLeastOnce`] (default is [`ExactlyOnce`])
195pub struct Stream<
196 Type,
197 Loc,
198 Bound: Boundedness = Unbounded,
199 Order: Ordering = TotalOrder,
200 Retry: Retries = ExactlyOnce,
201> {
202 pub(crate) location: Loc,
203 pub(crate) ir_node: RefCell<HydroNode>,
204 pub(crate) flow_state: FlowState,
205
206 _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
207}
208
209impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
210 fn drop(&mut self) {
211 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
212 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
213 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
214 input: Box::new(ir_node),
215 op_metadata: HydroIrOpMetadata::new(),
216 });
217 }
218 }
219}
220
221impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
222 for Stream<T, L, Unbounded, O, R>
223where
224 L: Location<'a>,
225{
226 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
227 let new_meta = stream
228 .location
229 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
230
231 Stream {
232 location: stream.location.clone(),
233 flow_state: stream.flow_state.clone(),
234 ir_node: RefCell::new(HydroNode::Cast {
235 inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
236 metadata: new_meta,
237 }),
238 _phantom: PhantomData,
239 }
240 }
241}
242
243impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
244 for Stream<T, L, B, NoOrder, R>
245where
246 L: Location<'a>,
247{
248 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
249 stream.weaken_ordering()
250 }
251}
252
253impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
254 for Stream<T, L, B, O, AtLeastOnce>
255where
256 L: Location<'a>,
257{
258 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
259 stream.weaken_retries()
260 }
261}
262
263impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
264where
265 L: Location<'a>,
266{
267 fn defer_tick(self) -> Self {
268 Stream::defer_tick(self)
269 }
270}
271
272impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
273 for Stream<T, Tick<L>, Bounded, O, R>
274where
275 L: Location<'a>,
276{
277 type Location = Tick<L>;
278
279 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
280 Stream::new(
281 location.clone(),
282 HydroNode::CycleSource {
283 cycle_id,
284 metadata: location.new_node_metadata(Self::collection_kind()),
285 },
286 )
287 }
288}
289
290impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
291 for Stream<T, Tick<L>, Bounded, O, R>
292where
293 L: Location<'a>,
294{
295 type Location = Tick<L>;
296
297 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
298 let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
299 location.clone(),
300 HydroNode::DeferTick {
301 input: Box::new(HydroNode::CycleSource {
302 cycle_id,
303 metadata: location.new_node_metadata(Self::collection_kind()),
304 }),
305 metadata: location.new_node_metadata(Self::collection_kind()),
306 },
307 );
308
309 from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
310 }
311}
312
313impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
314 for Stream<T, Tick<L>, Bounded, O, R>
315where
316 L: Location<'a>,
317{
318 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
319 assert_eq!(
320 Location::id(&self.location),
321 expected_location,
322 "locations do not match"
323 );
324 self.location
325 .flow_state()
326 .borrow_mut()
327 .push_root(HydroRoot::CycleSink {
328 cycle_id,
329 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
330 op_metadata: HydroIrOpMetadata::new(),
331 });
332 }
333}
334
335impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
336 for Stream<T, L, B, O, R>
337where
338 L: Location<'a> + NoTick,
339{
340 type Location = L;
341
342 fn create_source(cycle_id: CycleId, location: L) -> Self {
343 Stream::new(
344 location.clone(),
345 HydroNode::CycleSource {
346 cycle_id,
347 metadata: location.new_node_metadata(Self::collection_kind()),
348 },
349 )
350 }
351}
352
353impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
354 for Stream<T, L, B, O, R>
355where
356 L: Location<'a> + NoTick,
357{
358 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
359 assert_eq!(
360 Location::id(&self.location),
361 expected_location,
362 "locations do not match"
363 );
364 self.location
365 .flow_state()
366 .borrow_mut()
367 .push_root(HydroRoot::CycleSink {
368 cycle_id,
369 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
370 op_metadata: HydroIrOpMetadata::new(),
371 });
372 }
373}
374
375impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
376where
377 T: Clone,
378 L: Location<'a>,
379{
380 fn clone(&self) -> Self {
381 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
382 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
383 *self.ir_node.borrow_mut() = HydroNode::Tee {
384 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
385 metadata: self.location.new_node_metadata(Self::collection_kind()),
386 };
387 }
388
389 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
390 Stream {
391 location: self.location.clone(),
392 flow_state: self.flow_state.clone(),
393 ir_node: HydroNode::Tee {
394 inner: SharedNode(inner.0.clone()),
395 metadata: metadata.clone(),
396 }
397 .into(),
398 _phantom: PhantomData,
399 }
400 } else {
401 unreachable!()
402 }
403 }
404}
405
406impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
407where
408 L: Location<'a>,
409{
410 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
411 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
412 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
413
414 let flow_state = location.flow_state().clone();
415 Stream {
416 location,
417 flow_state,
418 ir_node: RefCell::new(ir_node),
419 _phantom: PhantomData,
420 }
421 }
422
423 /// Returns the [`Location`] where this stream is being materialized.
424 pub fn location(&self) -> &L {
425 &self.location
426 }
427
428 pub(crate) fn collection_kind() -> CollectionKind {
429 CollectionKind::Stream {
430 bound: B::BOUND_KIND,
431 order: O::ORDERING_KIND,
432 retry: R::RETRIES_KIND,
433 element_type: quote_type::<T>().into(),
434 }
435 }
436
437 /// Produces a stream based on invoking `f` on each element.
438 /// If you do not want to modify the stream and instead only want to view
439 /// each item use [`Stream::inspect`] instead.
440 ///
441 /// # Example
442 /// ```rust
443 /// # #[cfg(feature = "deploy")] {
444 /// # use hydro_lang::prelude::*;
445 /// # use futures::StreamExt;
446 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
447 /// let words = process.source_iter(q!(vec!["hello", "world"]));
448 /// words.map(q!(|x| x.to_uppercase()))
449 /// # }, |mut stream| async move {
450 /// # for w in vec!["HELLO", "WORLD"] {
451 /// # assert_eq!(stream.next().await.unwrap(), w);
452 /// # }
453 /// # }));
454 /// # }
455 /// ```
456 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
457 where
458 F: Fn(T) -> U + 'a,
459 {
460 let f = f.splice_fn1_ctx(&self.location).into();
461 Stream::new(
462 self.location.clone(),
463 HydroNode::Map {
464 f,
465 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
466 metadata: self
467 .location
468 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
469 },
470 )
471 }
472
473 /// For each item `i` in the input stream, transform `i` using `f` and then treat the
474 /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
475 /// for the output type `U` must produce items in a **deterministic** order.
476 ///
477 /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
478 /// not deterministic, use [`Stream::flat_map_unordered`] instead.
479 ///
480 /// # Example
481 /// ```rust
482 /// # #[cfg(feature = "deploy")] {
483 /// # use hydro_lang::prelude::*;
484 /// # use futures::StreamExt;
485 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
486 /// process
487 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
488 /// .flat_map_ordered(q!(|x| x))
489 /// # }, |mut stream| async move {
490 /// // 1, 2, 3, 4
491 /// # for w in (1..5) {
492 /// # assert_eq!(stream.next().await.unwrap(), w);
493 /// # }
494 /// # }));
495 /// # }
496 /// ```
497 pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
498 where
499 I: IntoIterator<Item = U>,
500 F: Fn(T) -> I + 'a,
501 {
502 let f = f.splice_fn1_ctx(&self.location).into();
503 Stream::new(
504 self.location.clone(),
505 HydroNode::FlatMap {
506 f,
507 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
508 metadata: self
509 .location
510 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
511 },
512 )
513 }
514
515 /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
516 /// for the output type `U` to produce items in any order.
517 ///
518 /// # Example
519 /// ```rust
520 /// # #[cfg(feature = "deploy")] {
521 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
522 /// # use futures::StreamExt;
523 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
524 /// process
525 /// .source_iter(q!(vec![
526 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
527 /// std::collections::HashSet::from_iter(vec![3, 4]),
528 /// ]))
529 /// .flat_map_unordered(q!(|x| x))
530 /// # }, |mut stream| async move {
531 /// // 1, 2, 3, 4, but in no particular order
532 /// # let mut results = Vec::new();
533 /// # for w in (1..5) {
534 /// # results.push(stream.next().await.unwrap());
535 /// # }
536 /// # results.sort();
537 /// # assert_eq!(results, vec![1, 2, 3, 4]);
538 /// # }));
539 /// # }
540 /// ```
541 pub fn flat_map_unordered<U, I, F>(
542 self,
543 f: impl IntoQuotedMut<'a, F, L>,
544 ) -> Stream<U, L, B, NoOrder, R>
545 where
546 I: IntoIterator<Item = U>,
547 F: Fn(T) -> I + 'a,
548 {
549 let f = f.splice_fn1_ctx(&self.location).into();
550 Stream::new(
551 self.location.clone(),
552 HydroNode::FlatMap {
553 f,
554 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
555 metadata: self
556 .location
557 .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
558 },
559 )
560 }
561
562 /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
563 /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
564 ///
565 /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
566 /// not deterministic, use [`Stream::flatten_unordered`] instead.
567 ///
568 /// ```rust
569 /// # #[cfg(feature = "deploy")] {
570 /// # use hydro_lang::prelude::*;
571 /// # use futures::StreamExt;
572 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
573 /// process
574 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
575 /// .flatten_ordered()
576 /// # }, |mut stream| async move {
577 /// // 1, 2, 3, 4
578 /// # for w in (1..5) {
579 /// # assert_eq!(stream.next().await.unwrap(), w);
580 /// # }
581 /// # }));
582 /// # }
583 /// ```
584 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
585 where
586 T: IntoIterator<Item = U>,
587 {
588 self.flat_map_ordered(q!(|d| d))
589 }
590
591 /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
592 /// for the element type `T` to produce items in any order.
593 ///
594 /// # Example
595 /// ```rust
596 /// # #[cfg(feature = "deploy")] {
597 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
598 /// # use futures::StreamExt;
599 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
600 /// process
601 /// .source_iter(q!(vec![
602 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
603 /// std::collections::HashSet::from_iter(vec![3, 4]),
604 /// ]))
605 /// .flatten_unordered()
606 /// # }, |mut stream| async move {
607 /// // 1, 2, 3, 4, but in no particular order
608 /// # let mut results = Vec::new();
609 /// # for w in (1..5) {
610 /// # results.push(stream.next().await.unwrap());
611 /// # }
612 /// # results.sort();
613 /// # assert_eq!(results, vec![1, 2, 3, 4]);
614 /// # }));
615 /// # }
616 /// ```
617 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
618 where
619 T: IntoIterator<Item = U>,
620 {
621 self.flat_map_unordered(q!(|d| d))
622 }
623
624 /// For each item in the input stream, apply `f` to produce a [`futures::stream::Stream`],
625 /// then emit the elements of that stream one by one. When the inner stream yields
626 /// `Pending`, this operator yields as well.
627 pub fn flat_map_stream_blocking<U, S, F>(
628 self,
629 f: impl IntoQuotedMut<'a, F, L>,
630 ) -> Stream<U, L, B, O, R>
631 where
632 S: futures::Stream<Item = U>,
633 F: Fn(T) -> S + 'a,
634 {
635 let f = f.splice_fn1_ctx(&self.location).into();
636 Stream::new(
637 self.location.clone(),
638 HydroNode::FlatMapStreamBlocking {
639 f,
640 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
641 metadata: self
642 .location
643 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
644 },
645 )
646 }
647
648 /// For each item in the input stream, treat it as a [`futures::stream::Stream`] and
649 /// emit its elements one by one. When the inner stream yields `Pending`, this operator
650 /// yields as well.
651 pub fn flatten_stream_blocking<U>(self) -> Stream<U, L, B, O, R>
652 where
653 T: futures::Stream<Item = U>,
654 {
655 self.flat_map_stream_blocking(q!(|d| d))
656 }
657
658 /// Creates a stream containing only the elements of the input stream that satisfy a predicate
659 /// `f`, preserving the order of the elements.
660 ///
661 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
662 /// not modify or take ownership of the values. If you need to modify the values while filtering
663 /// use [`Stream::filter_map`] instead.
664 ///
665 /// # Example
666 /// ```rust
667 /// # #[cfg(feature = "deploy")] {
668 /// # use hydro_lang::prelude::*;
669 /// # use futures::StreamExt;
670 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
671 /// process
672 /// .source_iter(q!(vec![1, 2, 3, 4]))
673 /// .filter(q!(|&x| x > 2))
674 /// # }, |mut stream| async move {
675 /// // 3, 4
676 /// # for w in (3..5) {
677 /// # assert_eq!(stream.next().await.unwrap(), w);
678 /// # }
679 /// # }));
680 /// # }
681 /// ```
682 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
683 where
684 F: Fn(&T) -> bool + 'a,
685 {
686 let f = f.splice_fn1_borrow_ctx(&self.location).into();
687 Stream::new(
688 self.location.clone(),
689 HydroNode::Filter {
690 f,
691 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
692 metadata: self.location.new_node_metadata(Self::collection_kind()),
693 },
694 )
695 }
696
697 /// Splits the stream into two streams based on a predicate, without cloning elements.
698 ///
699 /// Elements for which `f` returns `true` are sent to the first output stream,
700 /// and elements for which `f` returns `false` are sent to the second output stream.
701 ///
702 /// Unlike using `filter` twice, this only evaluates the predicate once per element
703 /// and does not require `T: Clone`.
704 ///
705 /// The closure `f` receives a reference `&T` rather than an owned value `T` because
706 /// the predicate is only used for routing; the element itself is moved to the
707 /// appropriate output stream.
708 ///
709 /// # Example
710 /// ```rust
711 /// # #[cfg(feature = "deploy")] {
712 /// # use hydro_lang::prelude::*;
713 /// # use hydro_lang::live_collections::stream::{NoOrder, ExactlyOnce};
714 /// # use futures::StreamExt;
715 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
716 /// let numbers: Stream<_, _, Unbounded> = process.source_iter(q!(vec![1, 2, 3, 4, 5, 6])).into();
717 /// let (evens, odds) = numbers.partition(q!(|&x| x % 2 == 0));
718 /// // evens: 2, 4, 6 tagged with true; odds: 1, 3, 5 tagged with false
719 /// evens.map(q!(|x| (x, true)))
720 /// .merge_unordered(odds.map(q!(|x| (x, false))))
721 /// # }, |mut stream| async move {
722 /// # let mut results = Vec::new();
723 /// # for _ in 0..6 {
724 /// # results.push(stream.next().await.unwrap());
725 /// # }
726 /// # results.sort();
727 /// # assert_eq!(results, vec![(1, false), (2, true), (3, false), (4, true), (5, false), (6, true)]);
728 /// # }));
729 /// # }
730 /// ```
731 #[expect(
732 clippy::type_complexity,
733 reason = "return type mirrors the input stream type"
734 )]
735 pub fn partition<F>(
736 self,
737 f: impl IntoQuotedMut<'a, F, L>,
738 ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
739 where
740 F: Fn(&T) -> bool + 'a,
741 {
742 let f: crate::compile::ir::DebugExpr = f.splice_fn1_borrow_ctx(&self.location).into();
743 let shared = SharedNode(Rc::new(RefCell::new(
744 self.ir_node.replace(HydroNode::Placeholder),
745 )));
746
747 let true_stream = Stream::new(
748 self.location.clone(),
749 HydroNode::Partition {
750 inner: SharedNode(shared.0.clone()),
751 f: f.clone(),
752 is_true: true,
753 metadata: self.location.new_node_metadata(Self::collection_kind()),
754 },
755 );
756
757 let false_stream = Stream::new(
758 self.location.clone(),
759 HydroNode::Partition {
760 inner: SharedNode(shared.0),
761 f,
762 is_true: false,
763 metadata: self.location.new_node_metadata(Self::collection_kind()),
764 },
765 );
766
767 (true_stream, false_stream)
768 }
769
770 /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
771 ///
772 /// # Example
773 /// ```rust
774 /// # #[cfg(feature = "deploy")] {
775 /// # use hydro_lang::prelude::*;
776 /// # use futures::StreamExt;
777 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
778 /// process
779 /// .source_iter(q!(vec!["1", "hello", "world", "2"]))
780 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
781 /// # }, |mut stream| async move {
782 /// // 1, 2
783 /// # for w in (1..3) {
784 /// # assert_eq!(stream.next().await.unwrap(), w);
785 /// # }
786 /// # }));
787 /// # }
788 /// ```
789 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
790 where
791 F: Fn(T) -> Option<U> + 'a,
792 {
793 let f = f.splice_fn1_ctx(&self.location).into();
794 Stream::new(
795 self.location.clone(),
796 HydroNode::FilterMap {
797 f,
798 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
799 metadata: self
800 .location
801 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
802 },
803 )
804 }
805
806 /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
807 /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
808 /// If `other` is an empty [`Optional`], no values will be produced.
809 ///
810 /// # Example
811 /// ```rust
812 /// # #[cfg(feature = "deploy")] {
813 /// # use hydro_lang::prelude::*;
814 /// # use futures::StreamExt;
815 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
816 /// let tick = process.tick();
817 /// let batch = process
818 /// .source_iter(q!(vec![1, 2, 3, 4]))
819 /// .batch(&tick, nondet!(/** test */));
820 /// let count = batch.clone().count(); // `count()` returns a singleton
821 /// batch.cross_singleton(count).all_ticks()
822 /// # }, |mut stream| async move {
823 /// // (1, 4), (2, 4), (3, 4), (4, 4)
824 /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
825 /// # assert_eq!(stream.next().await.unwrap(), w);
826 /// # }
827 /// # }));
828 /// # }
829 /// ```
830 pub fn cross_singleton<O2>(
831 self,
832 other: impl Into<Optional<O2, L, Bounded>>,
833 ) -> Stream<(T, O2), L, B, O, R>
834 where
835 O2: Clone,
836 {
837 let other: Optional<O2, L, Bounded> = other.into();
838 check_matching_location(&self.location, &other.location);
839
840 Stream::new(
841 self.location.clone(),
842 HydroNode::CrossSingleton {
843 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
844 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
845 metadata: self
846 .location
847 .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
848 },
849 )
850 }
851
852 /// Passes this stream through if the boolean signal is `true`, otherwise the output is empty.
853 ///
854 /// # Example
855 /// ```rust
856 /// # #[cfg(feature = "deploy")] {
857 /// # use hydro_lang::prelude::*;
858 /// # use futures::StreamExt;
859 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
860 /// let tick = process.tick();
861 /// // ticks are lazy by default, forces the second tick to run
862 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
863 ///
864 /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
865 /// let batch_first_tick = process
866 /// .source_iter(q!(vec![1, 2, 3, 4]))
867 /// .batch(&tick, nondet!(/** test */));
868 /// let batch_second_tick = process
869 /// .source_iter(q!(vec![5, 6, 7, 8]))
870 /// .batch(&tick, nondet!(/** test */))
871 /// .defer_tick();
872 /// batch_first_tick.chain(batch_second_tick)
873 /// .filter_if(signal)
874 /// .all_ticks()
875 /// # }, |mut stream| async move {
876 /// // [1, 2, 3, 4]
877 /// # for w in vec![1, 2, 3, 4] {
878 /// # assert_eq!(stream.next().await.unwrap(), w);
879 /// # }
880 /// # }));
881 /// # }
882 /// ```
883 pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
884 self.cross_singleton(signal.filter(q!(|b| *b)))
885 .map(q!(|(d, _)| d))
886 }
887
888 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
889 ///
890 /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
891 /// leader of a cluster.
892 ///
893 /// # Example
894 /// ```rust
895 /// # #[cfg(feature = "deploy")] {
896 /// # use hydro_lang::prelude::*;
897 /// # use futures::StreamExt;
898 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
899 /// let tick = process.tick();
900 /// // ticks are lazy by default, forces the second tick to run
901 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
902 ///
903 /// let batch_first_tick = process
904 /// .source_iter(q!(vec![1, 2, 3, 4]))
905 /// .batch(&tick, nondet!(/** test */));
906 /// let batch_second_tick = process
907 /// .source_iter(q!(vec![5, 6, 7, 8]))
908 /// .batch(&tick, nondet!(/** test */))
909 /// .defer_tick(); // appears on the second tick
910 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
911 /// batch_first_tick.chain(batch_second_tick)
912 /// .filter_if_some(some_on_first_tick)
913 /// .all_ticks()
914 /// # }, |mut stream| async move {
915 /// // [1, 2, 3, 4]
916 /// # for w in vec![1, 2, 3, 4] {
917 /// # assert_eq!(stream.next().await.unwrap(), w);
918 /// # }
919 /// # }));
920 /// # }
921 /// ```
922 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
923 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
924 self.filter_if(signal.is_some())
925 }
926
927 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
928 ///
929 /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
930 /// some local state.
931 ///
932 /// # Example
933 /// ```rust
934 /// # #[cfg(feature = "deploy")] {
935 /// # use hydro_lang::prelude::*;
936 /// # use futures::StreamExt;
937 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
938 /// let tick = process.tick();
939 /// // ticks are lazy by default, forces the second tick to run
940 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
941 ///
942 /// let batch_first_tick = process
943 /// .source_iter(q!(vec![1, 2, 3, 4]))
944 /// .batch(&tick, nondet!(/** test */));
945 /// let batch_second_tick = process
946 /// .source_iter(q!(vec![5, 6, 7, 8]))
947 /// .batch(&tick, nondet!(/** test */))
948 /// .defer_tick(); // appears on the second tick
949 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
950 /// batch_first_tick.chain(batch_second_tick)
951 /// .filter_if_none(some_on_first_tick)
952 /// .all_ticks()
953 /// # }, |mut stream| async move {
954 /// // [5, 6, 7, 8]
955 /// # for w in vec![5, 6, 7, 8] {
956 /// # assert_eq!(stream.next().await.unwrap(), w);
957 /// # }
958 /// # }));
959 /// # }
960 /// ```
961 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
962 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
963 self.filter_if(other.is_none())
964 }
965
966 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
967 /// tupled pairs in a non-deterministic order.
968 ///
969 /// # Example
970 /// ```rust
971 /// # #[cfg(feature = "deploy")] {
972 /// # use hydro_lang::prelude::*;
973 /// # use std::collections::HashSet;
974 /// # use futures::StreamExt;
975 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
976 /// let tick = process.tick();
977 /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
978 /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
979 /// stream1.cross_product(stream2)
980 /// # }, |mut stream| async move {
981 /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
982 /// # stream.map(|i| assert!(expected.contains(&i)));
983 /// # }));
984 /// # }
985 /// ```
986 pub fn cross_product<T2, O2: Ordering>(
987 self,
988 other: Stream<T2, L, B, O2, R>,
989 ) -> Stream<(T, T2), L, B, NoOrder, R>
990 where
991 T: Clone,
992 T2: Clone,
993 {
994 check_matching_location(&self.location, &other.location);
995
996 Stream::new(
997 self.location.clone(),
998 HydroNode::CrossProduct {
999 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1000 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1001 metadata: self
1002 .location
1003 .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
1004 },
1005 )
1006 }
1007
1008 /// Takes one stream as input and filters out any duplicate occurrences. The output
1009 /// contains all unique values from the input.
1010 ///
1011 /// # Example
1012 /// ```rust
1013 /// # #[cfg(feature = "deploy")] {
1014 /// # use hydro_lang::prelude::*;
1015 /// # use futures::StreamExt;
1016 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1017 /// let tick = process.tick();
1018 /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
1019 /// # }, |mut stream| async move {
1020 /// # for w in vec![1, 2, 3, 4] {
1021 /// # assert_eq!(stream.next().await.unwrap(), w);
1022 /// # }
1023 /// # }));
1024 /// # }
1025 /// ```
1026 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
1027 where
1028 T: Eq + Hash,
1029 {
1030 Stream::new(
1031 self.location.clone(),
1032 HydroNode::Unique {
1033 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1034 metadata: self
1035 .location
1036 .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1037 },
1038 )
1039 }
1040
1041 /// Outputs everything in this stream that is *not* contained in the `other` stream.
1042 ///
1043 /// The `other` stream must be [`Bounded`], since this function will wait until
1044 /// all its elements are available before producing any output.
1045 /// # Example
1046 /// ```rust
1047 /// # #[cfg(feature = "deploy")] {
1048 /// # use hydro_lang::prelude::*;
1049 /// # use futures::StreamExt;
1050 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1051 /// let tick = process.tick();
1052 /// let stream = process
1053 /// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
1054 /// .batch(&tick, nondet!(/** test */));
1055 /// let batch = process
1056 /// .source_iter(q!(vec![1, 2]))
1057 /// .batch(&tick, nondet!(/** test */));
1058 /// stream.filter_not_in(batch).all_ticks()
1059 /// # }, |mut stream| async move {
1060 /// # for w in vec![3, 4] {
1061 /// # assert_eq!(stream.next().await.unwrap(), w);
1062 /// # }
1063 /// # }));
1064 /// # }
1065 /// ```
1066 pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1067 where
1068 T: Eq + Hash,
1069 B2: IsBounded,
1070 {
1071 check_matching_location(&self.location, &other.location);
1072
1073 Stream::new(
1074 self.location.clone(),
1075 HydroNode::Difference {
1076 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1077 neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1078 metadata: self
1079 .location
1080 .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1081 },
1082 )
1083 }
1084
1085 /// An operator which allows you to "inspect" each element of a stream without
1086 /// modifying it. The closure `f` is called on a reference to each item. This is
1087 /// mainly useful for debugging, and should not be used to generate side-effects.
1088 ///
1089 /// # Example
1090 /// ```rust
1091 /// # #[cfg(feature = "deploy")] {
1092 /// # use hydro_lang::prelude::*;
1093 /// # use futures::StreamExt;
1094 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1095 /// let nums = process.source_iter(q!(vec![1, 2]));
1096 /// // prints "1 * 10 = 10" and "2 * 10 = 20"
1097 /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
1098 /// # }, |mut stream| async move {
1099 /// # for w in vec![1, 2] {
1100 /// # assert_eq!(stream.next().await.unwrap(), w);
1101 /// # }
1102 /// # }));
1103 /// # }
1104 /// ```
1105 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1106 where
1107 F: Fn(&T) + 'a,
1108 {
1109 let f = f.splice_fn1_borrow_ctx(&self.location).into();
1110
1111 Stream::new(
1112 self.location.clone(),
1113 HydroNode::Inspect {
1114 f,
1115 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1116 metadata: self.location.new_node_metadata(Self::collection_kind()),
1117 },
1118 )
1119 }
1120
1121 /// Executes the provided closure for every element in this stream.
1122 ///
1123 /// Because the closure may have side effects, the stream must have deterministic order
1124 /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
1125 /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
1126 /// [`Stream::assume_retries`] with an explanation for why this is the case.
1127 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
1128 where
1129 O: IsOrdered,
1130 R: IsExactlyOnce,
1131 {
1132 let f = f.splice_fn1_ctx(&self.location).into();
1133 self.location
1134 .flow_state()
1135 .borrow_mut()
1136 .push_root(HydroRoot::ForEach {
1137 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1138 f,
1139 op_metadata: HydroIrOpMetadata::new(),
1140 });
1141 }
1142
1143 /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
1144 /// TCP socket to some other server. You should _not_ use this API for interacting with
1145 /// external clients, instead see [`Location::bidi_external_many_bytes`] and
1146 /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
1147 /// interaction with asynchronous sinks.
1148 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1149 where
1150 O: IsOrdered,
1151 R: IsExactlyOnce,
1152 S: 'a + futures::Sink<T> + Unpin,
1153 {
1154 self.location
1155 .flow_state()
1156 .borrow_mut()
1157 .push_root(HydroRoot::DestSink {
1158 sink: sink.splice_typed_ctx(&self.location).into(),
1159 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1160 op_metadata: HydroIrOpMetadata::new(),
1161 });
1162 }
1163
1164 /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1165 ///
1166 /// # Example
1167 /// ```rust
1168 /// # #[cfg(feature = "deploy")] {
1169 /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1170 /// # use futures::StreamExt;
1171 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1172 /// let tick = process.tick();
1173 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1174 /// numbers.enumerate()
1175 /// # }, |mut stream| async move {
1176 /// // (0, 1), (1, 2), (2, 3), (3, 4)
1177 /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1178 /// # assert_eq!(stream.next().await.unwrap(), w);
1179 /// # }
1180 /// # }));
1181 /// # }
1182 /// ```
1183 pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1184 where
1185 O: IsOrdered,
1186 R: IsExactlyOnce,
1187 {
1188 Stream::new(
1189 self.location.clone(),
1190 HydroNode::Enumerate {
1191 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1192 metadata: self.location.new_node_metadata(Stream::<
1193 (usize, T),
1194 L,
1195 B,
1196 TotalOrder,
1197 ExactlyOnce,
1198 >::collection_kind()),
1199 },
1200 )
1201 }
1202
1203 /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1204 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1205 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1206 ///
1207 /// Depending on the input stream guarantees, the closure may need to be commutative
1208 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1209 ///
1210 /// # Example
1211 /// ```rust
1212 /// # #[cfg(feature = "deploy")] {
1213 /// # use hydro_lang::prelude::*;
1214 /// # use futures::StreamExt;
1215 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1216 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1217 /// words
1218 /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1219 /// .into_stream()
1220 /// # }, |mut stream| async move {
1221 /// // "HELLOWORLD"
1222 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1223 /// # }));
1224 /// # }
1225 /// ```
1226 pub fn fold<A, I, F, C, Idemp, M, B2: SingletonBound>(
1227 self,
1228 init: impl IntoQuotedMut<'a, I, L>,
1229 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1230 ) -> Singleton<A, L, B2>
1231 where
1232 I: Fn() -> A + 'a,
1233 F: Fn(&mut A, T),
1234 C: ValidCommutativityFor<O>,
1235 Idemp: ValidIdempotenceFor<R>,
1236 B: ApplyMonotoneStream<M, B2>,
1237 {
1238 let init = init.splice_fn0_ctx(&self.location).into();
1239 let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1240 proof.register_proof(&comb);
1241
1242 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1243 let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1244
1245 let core = HydroNode::Fold {
1246 init,
1247 acc: comb.into(),
1248 input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1249 metadata: ordered_etc
1250 .location
1251 .new_node_metadata(Singleton::<A, L, B2>::collection_kind()),
1252 };
1253
1254 Singleton::new(ordered_etc.location.clone(), core)
1255 }
1256
1257 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1258 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1259 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1260 /// reference, so that it can be modified in place.
1261 ///
1262 /// Depending on the input stream guarantees, the closure may need to be commutative
1263 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1264 ///
1265 /// # Example
1266 /// ```rust
1267 /// # #[cfg(feature = "deploy")] {
1268 /// # use hydro_lang::prelude::*;
1269 /// # use futures::StreamExt;
1270 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1271 /// let bools = process.source_iter(q!(vec![false, true, false]));
1272 /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1273 /// # }, |mut stream| async move {
1274 /// // true
1275 /// # assert_eq!(stream.next().await.unwrap(), true);
1276 /// # }));
1277 /// # }
1278 /// ```
1279 pub fn reduce<F, C, Idemp>(
1280 self,
1281 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1282 ) -> Optional<T, L, B>
1283 where
1284 F: Fn(&mut T, T) + 'a,
1285 C: ValidCommutativityFor<O>,
1286 Idemp: ValidIdempotenceFor<R>,
1287 {
1288 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1289 proof.register_proof(&f);
1290
1291 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1292 let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1293
1294 let core = HydroNode::Reduce {
1295 f: f.into(),
1296 input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1297 metadata: ordered_etc
1298 .location
1299 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1300 };
1301
1302 Optional::new(ordered_etc.location.clone(), core)
1303 }
1304
1305 /// Computes the maximum element in the stream as an [`Optional`], which
1306 /// will be empty until the first element in the input arrives.
1307 ///
1308 /// # Example
1309 /// ```rust
1310 /// # #[cfg(feature = "deploy")] {
1311 /// # use hydro_lang::prelude::*;
1312 /// # use futures::StreamExt;
1313 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1314 /// let tick = process.tick();
1315 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1316 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1317 /// batch.max().all_ticks()
1318 /// # }, |mut stream| async move {
1319 /// // 4
1320 /// # assert_eq!(stream.next().await.unwrap(), 4);
1321 /// # }));
1322 /// # }
1323 /// ```
1324 pub fn max(self) -> Optional<T, L, B>
1325 where
1326 T: Ord,
1327 {
1328 self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1329 .assume_ordering_trusted_bounded::<TotalOrder>(
1330 nondet!(/** max is commutative, but order affects intermediates */),
1331 )
1332 .reduce(q!(|curr, new| {
1333 if new > *curr {
1334 *curr = new;
1335 }
1336 }))
1337 }
1338
1339 /// Computes the minimum element in the stream as an [`Optional`], which
1340 /// will be empty until the first element in the input arrives.
1341 ///
1342 /// # Example
1343 /// ```rust
1344 /// # #[cfg(feature = "deploy")] {
1345 /// # use hydro_lang::prelude::*;
1346 /// # use futures::StreamExt;
1347 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1348 /// let tick = process.tick();
1349 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1350 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1351 /// batch.min().all_ticks()
1352 /// # }, |mut stream| async move {
1353 /// // 1
1354 /// # assert_eq!(stream.next().await.unwrap(), 1);
1355 /// # }));
1356 /// # }
1357 /// ```
1358 pub fn min(self) -> Optional<T, L, B>
1359 where
1360 T: Ord,
1361 {
1362 self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1363 .assume_ordering_trusted_bounded::<TotalOrder>(
1364 nondet!(/** max is commutative, but order affects intermediates */),
1365 )
1366 .reduce(q!(|curr, new| {
1367 if new < *curr {
1368 *curr = new;
1369 }
1370 }))
1371 }
1372
1373 /// Computes the first element in the stream as an [`Optional`], which
1374 /// will be empty until the first element in the input arrives.
1375 ///
1376 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1377 /// re-ordering of elements may cause the first element to change.
1378 ///
1379 /// # Example
1380 /// ```rust
1381 /// # #[cfg(feature = "deploy")] {
1382 /// # use hydro_lang::prelude::*;
1383 /// # use futures::StreamExt;
1384 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1385 /// let tick = process.tick();
1386 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1387 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1388 /// batch.first().all_ticks()
1389 /// # }, |mut stream| async move {
1390 /// // 1
1391 /// # assert_eq!(stream.next().await.unwrap(), 1);
1392 /// # }));
1393 /// # }
1394 /// ```
1395 pub fn first(self) -> Optional<T, L, B>
1396 where
1397 O: IsOrdered,
1398 {
1399 self.make_totally_ordered()
1400 .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1401 .generator(q!(|| ()), q!(|_, item| Generate::Return(item)))
1402 .reduce(q!(|_, _| {}))
1403 }
1404
1405 /// Computes the last element in the stream as an [`Optional`], which
1406 /// will be empty until an element in the input arrives.
1407 ///
1408 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1409 /// re-ordering of elements may cause the last element to change.
1410 ///
1411 /// # Example
1412 /// ```rust
1413 /// # #[cfg(feature = "deploy")] {
1414 /// # use hydro_lang::prelude::*;
1415 /// # use futures::StreamExt;
1416 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1417 /// let tick = process.tick();
1418 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1419 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1420 /// batch.last().all_ticks()
1421 /// # }, |mut stream| async move {
1422 /// // 4
1423 /// # assert_eq!(stream.next().await.unwrap(), 4);
1424 /// # }));
1425 /// # }
1426 /// ```
1427 pub fn last(self) -> Optional<T, L, B>
1428 where
1429 O: IsOrdered,
1430 {
1431 self.make_totally_ordered()
1432 .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1433 .reduce(q!(|curr, new| *curr = new))
1434 }
1435
1436 /// Returns a stream containing at most the first `n` elements of the input stream,
1437 /// preserving the original order. Similar to `LIMIT` in SQL.
1438 ///
1439 /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1440 /// retries, since the result depends on the order and cardinality of elements.
1441 ///
1442 /// # Example
1443 /// ```rust
1444 /// # #[cfg(feature = "deploy")] {
1445 /// # use hydro_lang::prelude::*;
1446 /// # use futures::StreamExt;
1447 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1448 /// let numbers = process.source_iter(q!(vec![10, 20, 30, 40, 50]));
1449 /// numbers.limit(q!(3))
1450 /// # }, |mut stream| async move {
1451 /// // 10, 20, 30
1452 /// # for w in vec![10, 20, 30] {
1453 /// # assert_eq!(stream.next().await.unwrap(), w);
1454 /// # }
1455 /// # }));
1456 /// # }
1457 /// ```
1458 pub fn limit(
1459 self,
1460 n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1461 ) -> Stream<T, L, B, TotalOrder, ExactlyOnce>
1462 where
1463 O: IsOrdered,
1464 R: IsExactlyOnce,
1465 {
1466 self.generator(
1467 q!(|| 0usize),
1468 q!(move |count, item| {
1469 if *count == n {
1470 Generate::Break
1471 } else {
1472 *count += 1;
1473 if *count == n {
1474 Generate::Return(item)
1475 } else {
1476 Generate::Yield(item)
1477 }
1478 }
1479 }),
1480 )
1481 }
1482
1483 /// Collects all the elements of this stream into a single [`Vec`] element.
1484 ///
1485 /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1486 /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1487 /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1488 /// the vector at an arbitrary point in time.
1489 ///
1490 /// # Example
1491 /// ```rust
1492 /// # #[cfg(feature = "deploy")] {
1493 /// # use hydro_lang::prelude::*;
1494 /// # use futures::StreamExt;
1495 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1496 /// let tick = process.tick();
1497 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1498 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1499 /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1500 /// # }, |mut stream| async move {
1501 /// // [ vec![1, 2, 3, 4] ]
1502 /// # for w in vec![vec![1, 2, 3, 4]] {
1503 /// # assert_eq!(stream.next().await.unwrap(), w);
1504 /// # }
1505 /// # }));
1506 /// # }
1507 /// ```
1508 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1509 where
1510 O: IsOrdered,
1511 R: IsExactlyOnce,
1512 {
1513 self.make_totally_ordered().make_exactly_once().fold(
1514 q!(|| vec![]),
1515 q!(|acc, v| {
1516 acc.push(v);
1517 }),
1518 )
1519 }
1520
1521 /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1522 /// and emitting each intermediate result.
1523 ///
1524 /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1525 /// containing all intermediate accumulated values. The scan operation can also terminate early
1526 /// by returning `None`.
1527 ///
1528 /// The function takes a mutable reference to the accumulator and the current element, and returns
1529 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1530 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1531 ///
1532 /// # Examples
1533 ///
1534 /// Basic usage - running sum:
1535 /// ```rust
1536 /// # #[cfg(feature = "deploy")] {
1537 /// # use hydro_lang::prelude::*;
1538 /// # use futures::StreamExt;
1539 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1540 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1541 /// q!(|| 0),
1542 /// q!(|acc, x| {
1543 /// *acc += x;
1544 /// Some(*acc)
1545 /// }),
1546 /// )
1547 /// # }, |mut stream| async move {
1548 /// // Output: 1, 3, 6, 10
1549 /// # for w in vec![1, 3, 6, 10] {
1550 /// # assert_eq!(stream.next().await.unwrap(), w);
1551 /// # }
1552 /// # }));
1553 /// # }
1554 /// ```
1555 ///
1556 /// Early termination example:
1557 /// ```rust
1558 /// # #[cfg(feature = "deploy")] {
1559 /// # use hydro_lang::prelude::*;
1560 /// # use futures::StreamExt;
1561 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1562 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1563 /// q!(|| 1),
1564 /// q!(|state, x| {
1565 /// *state = *state * x;
1566 /// if *state > 6 {
1567 /// None // Terminate the stream
1568 /// } else {
1569 /// Some(-*state)
1570 /// }
1571 /// }),
1572 /// )
1573 /// # }, |mut stream| async move {
1574 /// // Output: -1, -2, -6
1575 /// # for w in vec![-1, -2, -6] {
1576 /// # assert_eq!(stream.next().await.unwrap(), w);
1577 /// # }
1578 /// # }));
1579 /// # }
1580 /// ```
1581 pub fn scan<A, U, I, F>(
1582 self,
1583 init: impl IntoQuotedMut<'a, I, L>,
1584 f: impl IntoQuotedMut<'a, F, L>,
1585 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1586 where
1587 O: IsOrdered,
1588 R: IsExactlyOnce,
1589 I: Fn() -> A + 'a,
1590 F: Fn(&mut A, T) -> Option<U> + 'a,
1591 {
1592 let init = init.splice_fn0_ctx(&self.location).into();
1593 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1594
1595 Stream::new(
1596 self.location.clone(),
1597 HydroNode::Scan {
1598 init,
1599 acc: f,
1600 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1601 metadata: self.location.new_node_metadata(
1602 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1603 ),
1604 },
1605 )
1606 }
1607
1608 /// Iteratively processes the elements of the stream using a state machine that can yield
1609 /// elements as it processes its inputs. This is designed to mirror the unstable generator
1610 /// syntax in Rust, without requiring special syntax.
1611 ///
1612 /// Like [`Stream::scan`], this function takes in an initializer that emits the initial
1613 /// state. The second argument defines the processing logic, taking in a mutable reference
1614 /// to the state and the value to be processed. It emits a [`Generate`] value, whose
1615 /// variants define what is emitted and whether further inputs should be processed.
1616 ///
1617 /// # Example
1618 /// ```rust
1619 /// # #[cfg(feature = "deploy")] {
1620 /// # use hydro_lang::prelude::*;
1621 /// # use futures::StreamExt;
1622 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1623 /// process.source_iter(q!(vec![1, 3, 100, 10])).generator(
1624 /// q!(|| 0),
1625 /// q!(|acc, x| {
1626 /// *acc += x;
1627 /// if *acc > 100 {
1628 /// hydro_lang::live_collections::keyed_stream::Generate::Return("done!".to_owned())
1629 /// } else if *acc % 2 == 0 {
1630 /// hydro_lang::live_collections::keyed_stream::Generate::Yield("even".to_owned())
1631 /// } else {
1632 /// hydro_lang::live_collections::keyed_stream::Generate::Continue
1633 /// }
1634 /// }),
1635 /// )
1636 /// # }, |mut stream| async move {
1637 /// // Output: "even", "done!"
1638 /// # let mut results = Vec::new();
1639 /// # for _ in 0..2 {
1640 /// # results.push(stream.next().await.unwrap());
1641 /// # }
1642 /// # results.sort();
1643 /// # assert_eq!(results, vec!["done!".to_owned(), "even".to_owned()]);
1644 /// # }));
1645 /// # }
1646 /// ```
1647 pub fn generator<A, U, I, F>(
1648 self,
1649 init: impl IntoQuotedMut<'a, I, L> + Copy,
1650 f: impl IntoQuotedMut<'a, F, L> + Copy,
1651 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1652 where
1653 O: IsOrdered,
1654 R: IsExactlyOnce,
1655 I: Fn() -> A + 'a,
1656 F: Fn(&mut A, T) -> Generate<U> + 'a,
1657 {
1658 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1659 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1660
1661 let this = self.make_totally_ordered().make_exactly_once();
1662
1663 // State is Option<Option<A>>:
1664 // None = not yet initialized
1665 // Some(Some(a)) = active with state a
1666 // Some(None) = terminated
1667 let scan_init = q!(|| None)
1668 .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1669 .into();
1670 let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1671 if state.is_none() {
1672 *state = Some(Some(init()));
1673 }
1674 match state {
1675 Some(Some(state_value)) => match f(state_value, v) {
1676 Generate::Yield(out) => Some(Some(out)),
1677 Generate::Return(out) => {
1678 *state = Some(None);
1679 Some(Some(out))
1680 }
1681 // Unlike KeyedStream, we can terminate the scan directly on
1682 // Break/Return because there is only one state (no other keys
1683 // that still need processing).
1684 Generate::Break => None,
1685 Generate::Continue => Some(None),
1686 },
1687 // State is Some(None) after Return; terminate the scan.
1688 _ => None,
1689 }
1690 })
1691 .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1692 .into();
1693
1694 let scan_node = HydroNode::Scan {
1695 init: scan_init,
1696 acc: scan_f,
1697 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1698 metadata: this.location.new_node_metadata(Stream::<
1699 Option<U>,
1700 L,
1701 B,
1702 TotalOrder,
1703 ExactlyOnce,
1704 >::collection_kind()),
1705 };
1706
1707 let flatten_f = q!(|d| d)
1708 .splice_fn1_ctx::<Option<U>, _>(&this.location)
1709 .into();
1710 let flatten_node = HydroNode::FlatMap {
1711 f: flatten_f,
1712 input: Box::new(scan_node),
1713 metadata: this
1714 .location
1715 .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1716 };
1717
1718 Stream::new(this.location.clone(), flatten_node)
1719 }
1720
1721 /// Given a time interval, returns a stream corresponding to samples taken from the
1722 /// stream roughly at that interval. The output will have elements in the same order
1723 /// as the input, but with arbitrary elements skipped between samples. There is also
1724 /// no guarantee on the exact timing of the samples.
1725 ///
1726 /// # Non-Determinism
1727 /// The output stream is non-deterministic in which elements are sampled, since this
1728 /// is controlled by a clock.
1729 pub fn sample_every(
1730 self,
1731 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1732 nondet: NonDet,
1733 ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
1734 where
1735 L: NoTick + NoAtomic,
1736 {
1737 let samples = self.location.source_interval(interval, nondet);
1738
1739 let tick = self.location.tick();
1740 self.batch(&tick, nondet)
1741 .filter_if(samples.batch(&tick, nondet).first().is_some())
1742 .all_ticks()
1743 .weaken_retries()
1744 }
1745
1746 /// Given a timeout duration, returns an [`Optional`] which will have a value if the
1747 /// stream has not emitted a value since that duration.
1748 ///
1749 /// # Non-Determinism
1750 /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1751 /// samples take place, timeouts may be non-deterministically generated or missed,
1752 /// and the notification of the timeout may be delayed as well. There is also no
1753 /// guarantee on how long the [`Optional`] will have a value after the timeout is
1754 /// detected based on when the next sample is taken.
1755 pub fn timeout(
1756 self,
1757 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
1758 nondet: NonDet,
1759 ) -> Optional<(), L, Unbounded>
1760 where
1761 L: NoTick + NoAtomic,
1762 {
1763 let tick = self.location.tick();
1764
1765 let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
1766 q!(|| None),
1767 q!(
1768 |latest, _| {
1769 *latest = Some(Instant::now());
1770 },
1771 commutative = manual_proof!(/** TODO */)
1772 ),
1773 );
1774
1775 latest_received
1776 .snapshot(&tick, nondet)
1777 .filter_map(q!(move |latest_received| {
1778 if let Some(latest_received) = latest_received {
1779 if Instant::now().duration_since(latest_received) > duration {
1780 Some(())
1781 } else {
1782 None
1783 }
1784 } else {
1785 Some(())
1786 }
1787 }))
1788 .latest()
1789 }
1790
1791 /// Shifts this stream into an atomic context, which guarantees that any downstream logic
1792 /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
1793 ///
1794 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1795 /// processed before an acknowledgement is emitted.
1796 pub fn atomic(self) -> Stream<T, Atomic<L>, B, O, R> {
1797 let id = self.location.flow_state().borrow_mut().next_clock_id();
1798 let out_location = Atomic {
1799 tick: Tick {
1800 id,
1801 l: self.location.clone(),
1802 },
1803 };
1804 Stream::new(
1805 out_location.clone(),
1806 HydroNode::BeginAtomic {
1807 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1808 metadata: out_location
1809 .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
1810 },
1811 )
1812 }
1813
1814 /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1815 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1816 /// the order of the input. The output stream will execute in the [`Tick`] that was
1817 /// used to create the atomic section.
1818 ///
1819 /// # Non-Determinism
1820 /// The batch boundaries are non-deterministic and may change across executions.
1821 pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
1822 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1823 Stream::new(
1824 tick.clone(),
1825 HydroNode::Batch {
1826 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1827 metadata: tick
1828 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
1829 },
1830 )
1831 }
1832
1833 /// An operator which allows you to "name" a `HydroNode`.
1834 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1835 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
1836 {
1837 let mut node = self.ir_node.borrow_mut();
1838 let metadata = node.metadata_mut();
1839 metadata.tag = Some(name.to_owned());
1840 }
1841 self
1842 }
1843
1844 /// Explicitly "casts" the stream to a type with a different ordering
1845 /// guarantee. Useful in unsafe code where the ordering cannot be proven
1846 /// by the type-system.
1847 ///
1848 /// # Non-Determinism
1849 /// This function is used as an escape hatch, and any mistakes in the
1850 /// provided ordering guarantee will propagate into the guarantees
1851 /// for the rest of the program.
1852 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
1853 if O::ORDERING_KIND == O2::ORDERING_KIND {
1854 Stream::new(
1855 self.location.clone(),
1856 self.ir_node.replace(HydroNode::Placeholder),
1857 )
1858 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1859 // We can always weaken the ordering guarantee
1860 Stream::new(
1861 self.location.clone(),
1862 HydroNode::Cast {
1863 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1864 metadata: self
1865 .location
1866 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1867 },
1868 )
1869 } else {
1870 Stream::new(
1871 self.location.clone(),
1872 HydroNode::ObserveNonDet {
1873 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1874 trusted: false,
1875 metadata: self
1876 .location
1877 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1878 },
1879 )
1880 }
1881 }
1882
1883 // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
1884 // intermediate states will not be revealed
1885 fn assume_ordering_trusted_bounded<O2: Ordering>(
1886 self,
1887 nondet: NonDet,
1888 ) -> Stream<T, L, B, O2, R> {
1889 if B::BOUNDED {
1890 self.assume_ordering_trusted(nondet)
1891 } else {
1892 self.assume_ordering(nondet)
1893 }
1894 }
1895
1896 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1897 // is not observable
1898 pub(crate) fn assume_ordering_trusted<O2: Ordering>(
1899 self,
1900 _nondet: NonDet,
1901 ) -> Stream<T, L, B, O2, R> {
1902 if O::ORDERING_KIND == O2::ORDERING_KIND {
1903 Stream::new(
1904 self.location.clone(),
1905 self.ir_node.replace(HydroNode::Placeholder),
1906 )
1907 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1908 // We can always weaken the ordering guarantee
1909 Stream::new(
1910 self.location.clone(),
1911 HydroNode::Cast {
1912 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1913 metadata: self
1914 .location
1915 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1916 },
1917 )
1918 } else {
1919 Stream::new(
1920 self.location.clone(),
1921 HydroNode::ObserveNonDet {
1922 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1923 trusted: true,
1924 metadata: self
1925 .location
1926 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1927 },
1928 )
1929 }
1930 }
1931
1932 #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
1933 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
1934 /// which is always safe because that is the weakest possible guarantee.
1935 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
1936 self.weaken_ordering::<NoOrder>()
1937 }
1938
1939 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
1940 /// enforcing that `O2` is weaker than the input ordering guarantee.
1941 pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
1942 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
1943 self.assume_ordering::<O2>(nondet)
1944 }
1945
1946 /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
1947 /// implies that `O == TotalOrder`.
1948 pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
1949 where
1950 O: IsOrdered,
1951 {
1952 self.assume_ordering(nondet!(/** no-op */))
1953 }
1954
1955 /// Explicitly "casts" the stream to a type with a different retries
1956 /// guarantee. Useful in unsafe code where the lack of retries cannot
1957 /// be proven by the type-system.
1958 ///
1959 /// # Non-Determinism
1960 /// This function is used as an escape hatch, and any mistakes in the
1961 /// provided retries guarantee will propagate into the guarantees
1962 /// for the rest of the program.
1963 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1964 if R::RETRIES_KIND == R2::RETRIES_KIND {
1965 Stream::new(
1966 self.location.clone(),
1967 self.ir_node.replace(HydroNode::Placeholder),
1968 )
1969 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1970 // We can always weaken the retries guarantee
1971 Stream::new(
1972 self.location.clone(),
1973 HydroNode::Cast {
1974 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1975 metadata: self
1976 .location
1977 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1978 },
1979 )
1980 } else {
1981 Stream::new(
1982 self.location.clone(),
1983 HydroNode::ObserveNonDet {
1984 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1985 trusted: false,
1986 metadata: self
1987 .location
1988 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1989 },
1990 )
1991 }
1992 }
1993
1994 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1995 // is not observable
1996 fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1997 if R::RETRIES_KIND == R2::RETRIES_KIND {
1998 Stream::new(
1999 self.location.clone(),
2000 self.ir_node.replace(HydroNode::Placeholder),
2001 )
2002 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2003 // We can always weaken the retries guarantee
2004 Stream::new(
2005 self.location.clone(),
2006 HydroNode::Cast {
2007 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2008 metadata: self
2009 .location
2010 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2011 },
2012 )
2013 } else {
2014 Stream::new(
2015 self.location.clone(),
2016 HydroNode::ObserveNonDet {
2017 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2018 trusted: true,
2019 metadata: self
2020 .location
2021 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2022 },
2023 )
2024 }
2025 }
2026
2027 #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
2028 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
2029 /// which is always safe because that is the weakest possible guarantee.
2030 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
2031 self.weaken_retries::<AtLeastOnce>()
2032 }
2033
2034 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
2035 /// enforcing that `R2` is weaker than the input retries guarantee.
2036 pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
2037 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
2038 self.assume_retries::<R2>(nondet)
2039 }
2040
2041 /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
2042 /// implies that `R == ExactlyOnce`.
2043 pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
2044 where
2045 R: IsExactlyOnce,
2046 {
2047 self.assume_retries(nondet!(/** no-op */))
2048 }
2049
2050 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
2051 /// implies that `B == Bounded`.
2052 pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
2053 where
2054 B: IsBounded,
2055 {
2056 Stream::new(
2057 self.location.clone(),
2058 self.ir_node.replace(HydroNode::Placeholder),
2059 )
2060 }
2061}
2062
2063impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
2064where
2065 L: Location<'a>,
2066{
2067 /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
2068 ///
2069 /// # Example
2070 /// ```rust
2071 /// # #[cfg(feature = "deploy")] {
2072 /// # use hydro_lang::prelude::*;
2073 /// # use futures::StreamExt;
2074 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2075 /// process.source_iter(q!(&[1, 2, 3])).cloned()
2076 /// # }, |mut stream| async move {
2077 /// // 1, 2, 3
2078 /// # for w in vec![1, 2, 3] {
2079 /// # assert_eq!(stream.next().await.unwrap(), w);
2080 /// # }
2081 /// # }));
2082 /// # }
2083 /// ```
2084 pub fn cloned(self) -> Stream<T, L, B, O, R>
2085 where
2086 T: Clone,
2087 {
2088 self.map(q!(|d| d.clone()))
2089 }
2090}
2091
2092impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2093where
2094 L: Location<'a>,
2095{
2096 /// Computes the number of elements in the stream as a [`Singleton`].
2097 ///
2098 /// # Example
2099 /// ```rust
2100 /// # #[cfg(feature = "deploy")] {
2101 /// # use hydro_lang::prelude::*;
2102 /// # use futures::StreamExt;
2103 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2104 /// let tick = process.tick();
2105 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2106 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2107 /// batch.count().all_ticks()
2108 /// # }, |mut stream| async move {
2109 /// // 4
2110 /// # assert_eq!(stream.next().await.unwrap(), 4);
2111 /// # }));
2112 /// # }
2113 /// ```
2114 pub fn count(self) -> Singleton<usize, L, B::StreamToMonotone> {
2115 self.assume_ordering_trusted::<TotalOrder>(nondet!(
2116 /// Order does not affect eventual count, and also does not affect intermediate states.
2117 ))
2118 .fold(
2119 q!(|| 0usize),
2120 q!(
2121 |count, _| *count += 1,
2122 monotone = manual_proof!(/** += 1 is monotone */)
2123 ),
2124 )
2125 }
2126}
2127
2128impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2129 /// Produces a new stream that merges the elements of the two input streams.
2130 /// The result has [`NoOrder`] because the order of merging is not guaranteed.
2131 ///
2132 /// Currently, both input streams must be [`Unbounded`]. When the streams are
2133 /// [`Bounded`], you can use [`Stream::chain`] instead.
2134 ///
2135 /// # Example
2136 /// ```rust
2137 /// # #[cfg(feature = "deploy")] {
2138 /// # use hydro_lang::prelude::*;
2139 /// # use futures::StreamExt;
2140 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2141 /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
2142 /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
2143 /// numbers.clone().map(q!(|x| x + 1)).merge_unordered(numbers)
2144 /// # }, |mut stream| async move {
2145 /// // 2, 3, 4, 5, and 1, 2, 3, 4 merged in unknown order
2146 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2147 /// # assert_eq!(stream.next().await.unwrap(), w);
2148 /// # }
2149 /// # }));
2150 /// # }
2151 /// ```
2152 pub fn merge_unordered<O2: Ordering, R2: Retries>(
2153 self,
2154 other: Stream<T, L, Unbounded, O2, R2>,
2155 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2156 where
2157 R: MinRetries<R2>,
2158 {
2159 Stream::new(
2160 self.location.clone(),
2161 HydroNode::Chain {
2162 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2163 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2164 metadata: self.location.new_node_metadata(Stream::<
2165 T,
2166 L,
2167 Unbounded,
2168 NoOrder,
2169 <R as MinRetries<R2>>::Min,
2170 >::collection_kind()),
2171 },
2172 )
2173 }
2174
2175 /// Deprecated: use [`Stream::merge_unordered`] instead.
2176 #[deprecated(note = "use `merge_unordered` instead")]
2177 pub fn interleave<O2: Ordering, R2: Retries>(
2178 self,
2179 other: Stream<T, L, Unbounded, O2, R2>,
2180 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2181 where
2182 R: MinRetries<R2>,
2183 {
2184 self.merge_unordered(other)
2185 }
2186}
2187
2188impl<'a, T, L: Location<'a> + NoTick, R: Retries> Stream<T, L, Unbounded, TotalOrder, R> {
2189 /// Produces a new stream that combines the elements of the two input streams,
2190 /// preserving the relative order of elements within each input.
2191 ///
2192 /// Currently, both input streams must be [`Unbounded`]. When the streams are
2193 /// [`Bounded`], you can use [`Stream::chain`] instead.
2194 ///
2195 /// # Non-Determinism
2196 /// The order in which elements *across* the two streams will be interleaved is
2197 /// non-deterministic, so the order of elements will vary across runs. If the output order
2198 /// is irrelevant, use [`Stream::merge_unordered`] instead, which is deterministic but emits an
2199 /// unordered stream.
2200 ///
2201 /// # Example
2202 /// ```rust
2203 /// # #[cfg(feature = "deploy")] {
2204 /// # use hydro_lang::prelude::*;
2205 /// # use futures::StreamExt;
2206 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2207 /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
2208 /// # process.source_iter(q!(vec![1, 3])).into();
2209 /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
2210 /// # }, |mut stream| async move {
2211 /// // 1, 3 and 2, 4 in some order, preserving the original local order
2212 /// # for w in vec![1, 3, 2, 4] {
2213 /// # assert_eq!(stream.next().await.unwrap(), w);
2214 /// # }
2215 /// # }));
2216 /// # }
2217 /// ```
2218 pub fn merge_ordered<R2: Retries>(
2219 self,
2220 other: Stream<T, L, Unbounded, TotalOrder, R2>,
2221 _nondet: NonDet,
2222 ) -> Stream<T, L, Unbounded, TotalOrder, <R as MinRetries<R2>>::Min>
2223 where
2224 R: MinRetries<R2>,
2225 {
2226 Stream::new(
2227 self.location.clone(),
2228 HydroNode::Chain {
2229 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2230 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2231 metadata: self.location.new_node_metadata(Stream::<
2232 T,
2233 L,
2234 Unbounded,
2235 TotalOrder,
2236 <R as MinRetries<R2>>::Min,
2237 >::collection_kind()),
2238 },
2239 )
2240 }
2241}
2242
2243impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2244where
2245 L: Location<'a>,
2246{
2247 /// Produces a new stream that emits the input elements in sorted order.
2248 ///
2249 /// The input stream can have any ordering guarantee, but the output stream
2250 /// will have a [`TotalOrder`] guarantee. This operator will block until all
2251 /// elements in the input stream are available, so it requires the input stream
2252 /// to be [`Bounded`].
2253 ///
2254 /// # Example
2255 /// ```rust
2256 /// # #[cfg(feature = "deploy")] {
2257 /// # use hydro_lang::prelude::*;
2258 /// # use futures::StreamExt;
2259 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2260 /// let tick = process.tick();
2261 /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
2262 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2263 /// batch.sort().all_ticks()
2264 /// # }, |mut stream| async move {
2265 /// // 1, 2, 3, 4
2266 /// # for w in (1..5) {
2267 /// # assert_eq!(stream.next().await.unwrap(), w);
2268 /// # }
2269 /// # }));
2270 /// # }
2271 /// ```
2272 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2273 where
2274 B: IsBounded,
2275 T: Ord,
2276 {
2277 let this = self.make_bounded();
2278 Stream::new(
2279 this.location.clone(),
2280 HydroNode::Sort {
2281 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2282 metadata: this
2283 .location
2284 .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2285 },
2286 )
2287 }
2288
2289 /// Produces a new stream that first emits the elements of the `self` stream,
2290 /// and then emits the elements of the `other` stream. The output stream has
2291 /// a [`TotalOrder`] guarantee if and only if both input streams have a
2292 /// [`TotalOrder`] guarantee.
2293 ///
2294 /// Currently, both input streams must be [`Bounded`]. This operator will block
2295 /// on the first stream until all its elements are available. In a future version,
2296 /// we will relax the requirement on the `other` stream.
2297 ///
2298 /// # Example
2299 /// ```rust
2300 /// # #[cfg(feature = "deploy")] {
2301 /// # use hydro_lang::prelude::*;
2302 /// # use futures::StreamExt;
2303 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2304 /// let tick = process.tick();
2305 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2306 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2307 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2308 /// # }, |mut stream| async move {
2309 /// // 2, 3, 4, 5, 1, 2, 3, 4
2310 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2311 /// # assert_eq!(stream.next().await.unwrap(), w);
2312 /// # }
2313 /// # }));
2314 /// # }
2315 /// ```
2316 pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2317 self,
2318 other: Stream<T, L, B2, O2, R2>,
2319 ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2320 where
2321 B: IsBounded,
2322 O: MinOrder<O2>,
2323 R: MinRetries<R2>,
2324 {
2325 check_matching_location(&self.location, &other.location);
2326
2327 Stream::new(
2328 self.location.clone(),
2329 HydroNode::Chain {
2330 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2331 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2332 metadata: self.location.new_node_metadata(Stream::<
2333 T,
2334 L,
2335 B2,
2336 <O as MinOrder<O2>>::Min,
2337 <R as MinRetries<R2>>::Min,
2338 >::collection_kind()),
2339 },
2340 )
2341 }
2342
2343 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
2344 /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
2345 /// because this is compiled into a nested loop.
2346 pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
2347 self,
2348 other: Stream<T2, L, Bounded, O2, R>,
2349 ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
2350 where
2351 B: IsBounded,
2352 T: Clone,
2353 T2: Clone,
2354 {
2355 let this = self.make_bounded();
2356 check_matching_location(&this.location, &other.location);
2357
2358 Stream::new(
2359 this.location.clone(),
2360 HydroNode::CrossProduct {
2361 left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2362 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2363 metadata: this.location.new_node_metadata(Stream::<
2364 (T, T2),
2365 L,
2366 Bounded,
2367 <O2 as MinOrder<O>>::Min,
2368 R,
2369 >::collection_kind()),
2370 },
2371 )
2372 }
2373
2374 /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2375 /// `self` used as the values for *each* key.
2376 ///
2377 /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2378 /// values. For example, it can be used to send the same set of elements to several cluster
2379 /// members, if the membership information is available as a [`KeyedSingleton`].
2380 ///
2381 /// # Example
2382 /// ```rust
2383 /// # #[cfg(feature = "deploy")] {
2384 /// # use hydro_lang::prelude::*;
2385 /// # use futures::StreamExt;
2386 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2387 /// # let tick = process.tick();
2388 /// let keyed_singleton = // { 1: (), 2: () }
2389 /// # process
2390 /// # .source_iter(q!(vec![(1, ()), (2, ())]))
2391 /// # .into_keyed()
2392 /// # .batch(&tick, nondet!(/** test */))
2393 /// # .first();
2394 /// let stream = // [ "a", "b" ]
2395 /// # process
2396 /// # .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2397 /// # .batch(&tick, nondet!(/** test */));
2398 /// stream.repeat_with_keys(keyed_singleton)
2399 /// # .entries().all_ticks()
2400 /// # }, |mut stream| async move {
2401 /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2402 /// # let mut results = Vec::new();
2403 /// # for _ in 0..4 {
2404 /// # results.push(stream.next().await.unwrap());
2405 /// # }
2406 /// # results.sort();
2407 /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2408 /// # }));
2409 /// # }
2410 /// ```
2411 pub fn repeat_with_keys<K, V2>(
2412 self,
2413 keys: KeyedSingleton<K, V2, L, Bounded>,
2414 ) -> KeyedStream<K, T, L, Bounded, O, R>
2415 where
2416 B: IsBounded,
2417 K: Clone,
2418 T: Clone,
2419 {
2420 keys.keys()
2421 .weaken_retries()
2422 .assume_ordering_trusted::<TotalOrder>(
2423 nondet!(/** keyed stream does not depend on ordering of keys */),
2424 )
2425 .cross_product_nested_loop(self.make_bounded())
2426 .into_keyed()
2427 }
2428
2429 /// Consumes a stream of `Future<T>`, resolving each future while blocking subgraph
2430 /// execution until all results are available. The output order is based on when futures
2431 /// complete, and may be different than the input order.
2432 ///
2433 /// Unlike [`Stream::resolve_futures`], which allows the subgraph to continue executing
2434 /// while futures are pending, this variant blocks until the futures resolve.
2435 ///
2436 /// # Example
2437 /// ```rust
2438 /// # #[cfg(feature = "deploy")] {
2439 /// # use std::collections::HashSet;
2440 /// # use futures::StreamExt;
2441 /// # use hydro_lang::prelude::*;
2442 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2443 /// process
2444 /// .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2445 /// .map(q!(|x| async move {
2446 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2447 /// x
2448 /// }))
2449 /// .resolve_futures_blocking()
2450 /// # },
2451 /// # |mut stream| async move {
2452 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2453 /// # let mut output = HashSet::new();
2454 /// # for _ in 1..10 {
2455 /// # output.insert(stream.next().await.unwrap());
2456 /// # }
2457 /// # assert_eq!(
2458 /// # output,
2459 /// # HashSet::<i32>::from_iter(1..10)
2460 /// # );
2461 /// # },
2462 /// # ));
2463 /// # }
2464 /// ```
2465 pub fn resolve_futures_blocking(self) -> Stream<T::Output, L, B, NoOrder, R>
2466 where
2467 T: Future,
2468 {
2469 Stream::new(
2470 self.location.clone(),
2471 HydroNode::ResolveFuturesBlocking {
2472 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2473 metadata: self
2474 .location
2475 .new_node_metadata(Stream::<T::Output, L, B, NoOrder, R>::collection_kind()),
2476 },
2477 )
2478 }
2479
2480 /// Returns a [`Singleton`] containing `true` if the stream has no elements, or `false` otherwise.
2481 ///
2482 /// # Example
2483 /// ```rust
2484 /// # #[cfg(feature = "deploy")] {
2485 /// # use hydro_lang::prelude::*;
2486 /// # use futures::StreamExt;
2487 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2488 /// let tick = process.tick();
2489 /// let empty: Stream<i32, _, Bounded> = process
2490 /// .source_iter(q!(Vec::<i32>::new()))
2491 /// .batch(&tick, nondet!(/** test */));
2492 /// empty.is_empty().all_ticks()
2493 /// # }, |mut stream| async move {
2494 /// // true
2495 /// # assert_eq!(stream.next().await.unwrap(), true);
2496 /// # }));
2497 /// # }
2498 /// ```
2499 #[expect(clippy::wrong_self_convention, reason = "stream function naming")]
2500 pub fn is_empty(self) -> Singleton<bool, L, Bounded>
2501 where
2502 B: IsBounded,
2503 {
2504 self.make_bounded()
2505 .assume_ordering_trusted::<TotalOrder>(
2506 nondet!(/** is_empty intermediates unaffected by order */),
2507 )
2508 .first()
2509 .is_none()
2510 }
2511}
2512
2513impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2514where
2515 L: Location<'a>,
2516{
2517 #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2518 /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2519 /// by equi-joining the two streams on the key attribute `K`.
2520 ///
2521 /// # Example
2522 /// ```rust
2523 /// # #[cfg(feature = "deploy")] {
2524 /// # use hydro_lang::prelude::*;
2525 /// # use std::collections::HashSet;
2526 /// # use futures::StreamExt;
2527 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2528 /// let tick = process.tick();
2529 /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2530 /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2531 /// stream1.join(stream2)
2532 /// # }, |mut stream| async move {
2533 /// // (1, ('a', 'x')), (2, ('b', 'y'))
2534 /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2535 /// # stream.map(|i| assert!(expected.contains(&i)));
2536 /// # }));
2537 /// # }
2538 pub fn join<V2, O2: Ordering, R2: Retries>(
2539 self,
2540 n: Stream<(K, V2), L, B, O2, R2>,
2541 ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
2542 where
2543 K: Eq + Hash,
2544 R: MinRetries<R2>,
2545 {
2546 check_matching_location(&self.location, &n.location);
2547
2548 Stream::new(
2549 self.location.clone(),
2550 HydroNode::Join {
2551 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2552 right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2553 metadata: self.location.new_node_metadata(Stream::<
2554 (K, (V1, V2)),
2555 L,
2556 B,
2557 NoOrder,
2558 <R as MinRetries<R2>>::Min,
2559 >::collection_kind()),
2560 },
2561 )
2562 }
2563
2564 /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2565 /// computes the anti-join of the items in the input -- i.e. returns
2566 /// unique items in the first input that do not have a matching key
2567 /// in the second input.
2568 ///
2569 /// # Example
2570 /// ```rust
2571 /// # #[cfg(feature = "deploy")] {
2572 /// # use hydro_lang::prelude::*;
2573 /// # use futures::StreamExt;
2574 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2575 /// let tick = process.tick();
2576 /// let stream = process
2577 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2578 /// .batch(&tick, nondet!(/** test */));
2579 /// let batch = process
2580 /// .source_iter(q!(vec![1, 2]))
2581 /// .batch(&tick, nondet!(/** test */));
2582 /// stream.anti_join(batch).all_ticks()
2583 /// # }, |mut stream| async move {
2584 /// # for w in vec![(3, 'c'), (4, 'd')] {
2585 /// # assert_eq!(stream.next().await.unwrap(), w);
2586 /// # }
2587 /// # }));
2588 /// # }
2589 pub fn anti_join<O2: Ordering, R2: Retries>(
2590 self,
2591 n: Stream<K, L, Bounded, O2, R2>,
2592 ) -> Stream<(K, V1), L, B, O, R>
2593 where
2594 K: Eq + Hash,
2595 {
2596 check_matching_location(&self.location, &n.location);
2597
2598 Stream::new(
2599 self.location.clone(),
2600 HydroNode::AntiJoin {
2601 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2602 neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2603 metadata: self
2604 .location
2605 .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2606 },
2607 )
2608 }
2609}
2610
2611impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2612 Stream<(K, V), L, B, O, R>
2613{
2614 /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2615 /// is used as the key and the second element is added to the entries associated with that key.
2616 ///
2617 /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2618 /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2619 /// performing grouped aggregations, but also for more precise ordering guarantees such as
2620 /// total ordering _within_ each group but no ordering _across_ groups.
2621 ///
2622 /// # Example
2623 /// ```rust
2624 /// # #[cfg(feature = "deploy")] {
2625 /// # use hydro_lang::prelude::*;
2626 /// # use futures::StreamExt;
2627 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2628 /// process
2629 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2630 /// .into_keyed()
2631 /// # .entries()
2632 /// # }, |mut stream| async move {
2633 /// // { 1: [2, 3], 2: [4] }
2634 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2635 /// # assert_eq!(stream.next().await.unwrap(), w);
2636 /// # }
2637 /// # }));
2638 /// # }
2639 /// ```
2640 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2641 KeyedStream::new(
2642 self.location.clone(),
2643 HydroNode::Cast {
2644 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2645 metadata: self
2646 .location
2647 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2648 },
2649 )
2650 }
2651}
2652
2653impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2654where
2655 K: Eq + Hash,
2656 L: Location<'a>,
2657{
2658 /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2659 /// # Example
2660 /// ```rust
2661 /// # #[cfg(feature = "deploy")] {
2662 /// # use hydro_lang::prelude::*;
2663 /// # use futures::StreamExt;
2664 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2665 /// let tick = process.tick();
2666 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2667 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2668 /// batch.keys().all_ticks()
2669 /// # }, |mut stream| async move {
2670 /// // 1, 2
2671 /// # assert_eq!(stream.next().await.unwrap(), 1);
2672 /// # assert_eq!(stream.next().await.unwrap(), 2);
2673 /// # }));
2674 /// # }
2675 /// ```
2676 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2677 self.into_keyed()
2678 .fold(
2679 q!(|| ()),
2680 q!(
2681 |_, _| {},
2682 commutative = manual_proof!(/** values are ignored */),
2683 idempotent = manual_proof!(/** values are ignored */)
2684 ),
2685 )
2686 .keys()
2687 }
2688}
2689
2690impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2691where
2692 L: Location<'a> + NoTick,
2693{
2694 /// Returns a stream corresponding to the latest batch of elements being atomically
2695 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2696 /// the order of the input.
2697 ///
2698 /// # Non-Determinism
2699 /// The batch boundaries are non-deterministic and may change across executions.
2700 pub fn batch_atomic(
2701 self,
2702 tick: &Tick<L>,
2703 _nondet: NonDet,
2704 ) -> Stream<T, Tick<L>, Bounded, O, R> {
2705 Stream::new(
2706 tick.clone(),
2707 HydroNode::Batch {
2708 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2709 metadata: tick
2710 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2711 },
2712 )
2713 }
2714
2715 /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2716 /// See [`Stream::atomic`] for more details.
2717 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2718 Stream::new(
2719 self.location.tick.l.clone(),
2720 HydroNode::EndAtomic {
2721 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2722 metadata: self
2723 .location
2724 .tick
2725 .l
2726 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2727 },
2728 )
2729 }
2730}
2731
2732impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2733where
2734 L: Location<'a> + NoTick + NoAtomic,
2735 F: Future<Output = T>,
2736{
2737 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2738 /// Future outputs are produced as available, regardless of input arrival order.
2739 ///
2740 /// # Example
2741 /// ```rust
2742 /// # #[cfg(feature = "deploy")] {
2743 /// # use std::collections::HashSet;
2744 /// # use futures::StreamExt;
2745 /// # use hydro_lang::prelude::*;
2746 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2747 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2748 /// .map(q!(|x| async move {
2749 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2750 /// x
2751 /// }))
2752 /// .resolve_futures()
2753 /// # },
2754 /// # |mut stream| async move {
2755 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2756 /// # let mut output = HashSet::new();
2757 /// # for _ in 1..10 {
2758 /// # output.insert(stream.next().await.unwrap());
2759 /// # }
2760 /// # assert_eq!(
2761 /// # output,
2762 /// # HashSet::<i32>::from_iter(1..10)
2763 /// # );
2764 /// # },
2765 /// # ));
2766 /// # }
2767 pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
2768 Stream::new(
2769 self.location.clone(),
2770 HydroNode::ResolveFutures {
2771 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2772 metadata: self
2773 .location
2774 .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
2775 },
2776 )
2777 }
2778
2779 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2780 /// Future outputs are produced in the same order as the input stream.
2781 ///
2782 /// # Example
2783 /// ```rust
2784 /// # #[cfg(feature = "deploy")] {
2785 /// # use std::collections::HashSet;
2786 /// # use futures::StreamExt;
2787 /// # use hydro_lang::prelude::*;
2788 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2789 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2790 /// .map(q!(|x| async move {
2791 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2792 /// x
2793 /// }))
2794 /// .resolve_futures_ordered()
2795 /// # },
2796 /// # |mut stream| async move {
2797 /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2798 /// # let mut output = Vec::new();
2799 /// # for _ in 1..10 {
2800 /// # output.push(stream.next().await.unwrap());
2801 /// # }
2802 /// # assert_eq!(
2803 /// # output,
2804 /// # vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2805 /// # );
2806 /// # },
2807 /// # ));
2808 /// # }
2809 pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
2810 Stream::new(
2811 self.location.clone(),
2812 HydroNode::ResolveFuturesOrdered {
2813 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2814 metadata: self
2815 .location
2816 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2817 },
2818 )
2819 }
2820}
2821
2822impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2823where
2824 L: Location<'a>,
2825{
2826 /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2827 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2828 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2829 Stream::new(
2830 self.location.outer().clone(),
2831 HydroNode::YieldConcat {
2832 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2833 metadata: self
2834 .location
2835 .outer()
2836 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2837 },
2838 )
2839 }
2840
2841 /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2842 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2843 ///
2844 /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2845 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2846 /// stream's [`Tick`] context.
2847 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2848 let out_location = Atomic {
2849 tick: self.location.clone(),
2850 };
2851
2852 Stream::new(
2853 out_location.clone(),
2854 HydroNode::YieldConcat {
2855 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2856 metadata: out_location
2857 .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2858 },
2859 )
2860 }
2861
2862 /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
2863 /// such as `fold` retrain their memory across ticks rather than resetting across batches of
2864 /// input.
2865 ///
2866 /// This API is particularly useful for stateful computation on batches of data, such as
2867 /// maintaining an accumulated state that is up to date with the current batch.
2868 ///
2869 /// # Example
2870 /// ```rust
2871 /// # #[cfg(feature = "deploy")] {
2872 /// # use hydro_lang::prelude::*;
2873 /// # use futures::StreamExt;
2874 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2875 /// let tick = process.tick();
2876 /// # // ticks are lazy by default, forces the second tick to run
2877 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2878 /// # let batch_first_tick = process
2879 /// # .source_iter(q!(vec![1, 2, 3, 4]))
2880 /// # .batch(&tick, nondet!(/** test */));
2881 /// # let batch_second_tick = process
2882 /// # .source_iter(q!(vec![5, 6, 7]))
2883 /// # .batch(&tick, nondet!(/** test */))
2884 /// # .defer_tick(); // appears on the second tick
2885 /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
2886 /// # batch_first_tick.chain(batch_second_tick).all_ticks();
2887 ///
2888 /// input.batch(&tick, nondet!(/** test */))
2889 /// .across_ticks(|s| s.count()).all_ticks()
2890 /// # }, |mut stream| async move {
2891 /// // [4, 7]
2892 /// assert_eq!(stream.next().await.unwrap(), 4);
2893 /// assert_eq!(stream.next().await.unwrap(), 7);
2894 /// # }));
2895 /// # }
2896 /// ```
2897 pub fn across_ticks<Out: BatchAtomic>(
2898 self,
2899 thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
2900 ) -> Out::Batched {
2901 thunk(self.all_ticks_atomic()).batched_atomic()
2902 }
2903
2904 /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2905 /// always has the elements of `self` at tick `T - 1`.
2906 ///
2907 /// At tick `0`, the output stream is empty, since there is no previous tick.
2908 ///
2909 /// This operator enables stateful iterative processing with ticks, by sending data from one
2910 /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2911 ///
2912 /// # Example
2913 /// ```rust
2914 /// # #[cfg(feature = "deploy")] {
2915 /// # use hydro_lang::prelude::*;
2916 /// # use futures::StreamExt;
2917 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2918 /// let tick = process.tick();
2919 /// // ticks are lazy by default, forces the second tick to run
2920 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2921 ///
2922 /// let batch_first_tick = process
2923 /// .source_iter(q!(vec![1, 2, 3, 4]))
2924 /// .batch(&tick, nondet!(/** test */));
2925 /// let batch_second_tick = process
2926 /// .source_iter(q!(vec![0, 3, 4, 5, 6]))
2927 /// .batch(&tick, nondet!(/** test */))
2928 /// .defer_tick(); // appears on the second tick
2929 /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2930 ///
2931 /// changes_across_ticks.clone().filter_not_in(
2932 /// changes_across_ticks.defer_tick() // the elements from the previous tick
2933 /// ).all_ticks()
2934 /// # }, |mut stream| async move {
2935 /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2936 /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2937 /// # assert_eq!(stream.next().await.unwrap(), w);
2938 /// # }
2939 /// # }));
2940 /// # }
2941 /// ```
2942 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2943 Stream::new(
2944 self.location.clone(),
2945 HydroNode::DeferTick {
2946 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2947 metadata: self
2948 .location
2949 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2950 },
2951 )
2952 }
2953}
2954
2955#[cfg(test)]
2956mod tests {
2957 #[cfg(feature = "deploy")]
2958 use futures::{SinkExt, StreamExt};
2959 #[cfg(feature = "deploy")]
2960 use hydro_deploy::Deployment;
2961 #[cfg(feature = "deploy")]
2962 use serde::{Deserialize, Serialize};
2963 #[cfg(any(feature = "deploy", feature = "sim"))]
2964 use stageleft::q;
2965
2966 #[cfg(any(feature = "deploy", feature = "sim"))]
2967 use crate::compile::builder::FlowBuilder;
2968 #[cfg(feature = "deploy")]
2969 use crate::live_collections::sliced::sliced;
2970 #[cfg(feature = "deploy")]
2971 use crate::live_collections::stream::ExactlyOnce;
2972 #[cfg(feature = "sim")]
2973 use crate::live_collections::stream::NoOrder;
2974 #[cfg(any(feature = "deploy", feature = "sim"))]
2975 use crate::live_collections::stream::TotalOrder;
2976 #[cfg(any(feature = "deploy", feature = "sim"))]
2977 use crate::location::Location;
2978 #[cfg(feature = "sim")]
2979 use crate::networking::TCP;
2980 #[cfg(any(feature = "deploy", feature = "sim"))]
2981 use crate::nondet::nondet;
2982
2983 mod backtrace_chained_ops;
2984
2985 #[cfg(feature = "deploy")]
2986 struct P1 {}
2987 #[cfg(feature = "deploy")]
2988 struct P2 {}
2989
2990 #[cfg(feature = "deploy")]
2991 #[derive(Serialize, Deserialize, Debug)]
2992 struct SendOverNetwork {
2993 n: u32,
2994 }
2995
2996 #[cfg(feature = "deploy")]
2997 #[tokio::test]
2998 async fn first_ten_distributed() {
2999 use crate::networking::TCP;
3000
3001 let mut deployment = Deployment::new();
3002
3003 let mut flow = FlowBuilder::new();
3004 let first_node = flow.process::<P1>();
3005 let second_node = flow.process::<P2>();
3006 let external = flow.external::<P2>();
3007
3008 let numbers = first_node.source_iter(q!(0..10));
3009 let out_port = numbers
3010 .map(q!(|n| SendOverNetwork { n }))
3011 .send(&second_node, TCP.fail_stop().bincode())
3012 .send_bincode_external(&external);
3013
3014 let nodes = flow
3015 .with_process(&first_node, deployment.Localhost())
3016 .with_process(&second_node, deployment.Localhost())
3017 .with_external(&external, deployment.Localhost())
3018 .deploy(&mut deployment);
3019
3020 deployment.deploy().await.unwrap();
3021
3022 let mut external_out = nodes.connect(out_port).await;
3023
3024 deployment.start().await.unwrap();
3025
3026 for i in 0..10 {
3027 assert_eq!(external_out.next().await.unwrap().n, i);
3028 }
3029 }
3030
3031 #[cfg(feature = "deploy")]
3032 #[tokio::test]
3033 async fn first_cardinality() {
3034 let mut deployment = Deployment::new();
3035
3036 let mut flow = FlowBuilder::new();
3037 let node = flow.process::<()>();
3038 let external = flow.external::<()>();
3039
3040 let node_tick = node.tick();
3041 let count = node_tick
3042 .singleton(q!([1, 2, 3]))
3043 .into_stream()
3044 .flatten_ordered()
3045 .first()
3046 .into_stream()
3047 .count()
3048 .all_ticks()
3049 .send_bincode_external(&external);
3050
3051 let nodes = flow
3052 .with_process(&node, deployment.Localhost())
3053 .with_external(&external, deployment.Localhost())
3054 .deploy(&mut deployment);
3055
3056 deployment.deploy().await.unwrap();
3057
3058 let mut external_out = nodes.connect(count).await;
3059
3060 deployment.start().await.unwrap();
3061
3062 assert_eq!(external_out.next().await.unwrap(), 1);
3063 }
3064
3065 #[cfg(feature = "deploy")]
3066 #[tokio::test]
3067 async fn unbounded_reduce_remembers_state() {
3068 let mut deployment = Deployment::new();
3069
3070 let mut flow = FlowBuilder::new();
3071 let node = flow.process::<()>();
3072 let external = flow.external::<()>();
3073
3074 let (input_port, input) = node.source_external_bincode(&external);
3075 let out = input
3076 .reduce(q!(|acc, v| *acc += v))
3077 .sample_eager(nondet!(/** test */))
3078 .send_bincode_external(&external);
3079
3080 let nodes = flow
3081 .with_process(&node, deployment.Localhost())
3082 .with_external(&external, deployment.Localhost())
3083 .deploy(&mut deployment);
3084
3085 deployment.deploy().await.unwrap();
3086
3087 let mut external_in = nodes.connect(input_port).await;
3088 let mut external_out = nodes.connect(out).await;
3089
3090 deployment.start().await.unwrap();
3091
3092 external_in.send(1).await.unwrap();
3093 assert_eq!(external_out.next().await.unwrap(), 1);
3094
3095 external_in.send(2).await.unwrap();
3096 assert_eq!(external_out.next().await.unwrap(), 3);
3097 }
3098
3099 #[cfg(feature = "deploy")]
3100 #[tokio::test]
3101 async fn top_level_bounded_cross_singleton() {
3102 let mut deployment = Deployment::new();
3103
3104 let mut flow = FlowBuilder::new();
3105 let node = flow.process::<()>();
3106 let external = flow.external::<()>();
3107
3108 let (input_port, input) =
3109 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3110
3111 let out = input
3112 .cross_singleton(
3113 node.source_iter(q!(vec![1, 2, 3]))
3114 .fold(q!(|| 0), q!(|acc, v| *acc += v)),
3115 )
3116 .send_bincode_external(&external);
3117
3118 let nodes = flow
3119 .with_process(&node, deployment.Localhost())
3120 .with_external(&external, deployment.Localhost())
3121 .deploy(&mut deployment);
3122
3123 deployment.deploy().await.unwrap();
3124
3125 let mut external_in = nodes.connect(input_port).await;
3126 let mut external_out = nodes.connect(out).await;
3127
3128 deployment.start().await.unwrap();
3129
3130 external_in.send(1).await.unwrap();
3131 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3132
3133 external_in.send(2).await.unwrap();
3134 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3135 }
3136
3137 #[cfg(feature = "deploy")]
3138 #[tokio::test]
3139 async fn top_level_bounded_reduce_cardinality() {
3140 let mut deployment = Deployment::new();
3141
3142 let mut flow = FlowBuilder::new();
3143 let node = flow.process::<()>();
3144 let external = flow.external::<()>();
3145
3146 let (input_port, input) =
3147 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3148
3149 let out = sliced! {
3150 let input = use(input, nondet!(/** test */));
3151 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
3152 input.cross_singleton(v.into_stream().count())
3153 }
3154 .send_bincode_external(&external);
3155
3156 let nodes = flow
3157 .with_process(&node, deployment.Localhost())
3158 .with_external(&external, deployment.Localhost())
3159 .deploy(&mut deployment);
3160
3161 deployment.deploy().await.unwrap();
3162
3163 let mut external_in = nodes.connect(input_port).await;
3164 let mut external_out = nodes.connect(out).await;
3165
3166 deployment.start().await.unwrap();
3167
3168 external_in.send(1).await.unwrap();
3169 assert_eq!(external_out.next().await.unwrap(), (1, 1));
3170
3171 external_in.send(2).await.unwrap();
3172 assert_eq!(external_out.next().await.unwrap(), (2, 1));
3173 }
3174
3175 #[cfg(feature = "deploy")]
3176 #[tokio::test]
3177 async fn top_level_bounded_into_singleton_cardinality() {
3178 let mut deployment = Deployment::new();
3179
3180 let mut flow = FlowBuilder::new();
3181 let node = flow.process::<()>();
3182 let external = flow.external::<()>();
3183
3184 let (input_port, input) =
3185 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3186
3187 let out = sliced! {
3188 let input = use(input, nondet!(/** test */));
3189 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
3190 input.cross_singleton(v.into_stream().count())
3191 }
3192 .send_bincode_external(&external);
3193
3194 let nodes = flow
3195 .with_process(&node, deployment.Localhost())
3196 .with_external(&external, deployment.Localhost())
3197 .deploy(&mut deployment);
3198
3199 deployment.deploy().await.unwrap();
3200
3201 let mut external_in = nodes.connect(input_port).await;
3202 let mut external_out = nodes.connect(out).await;
3203
3204 deployment.start().await.unwrap();
3205
3206 external_in.send(1).await.unwrap();
3207 assert_eq!(external_out.next().await.unwrap(), (1, 1));
3208
3209 external_in.send(2).await.unwrap();
3210 assert_eq!(external_out.next().await.unwrap(), (2, 1));
3211 }
3212
3213 #[cfg(feature = "deploy")]
3214 #[tokio::test]
3215 async fn atomic_fold_replays_each_tick() {
3216 let mut deployment = Deployment::new();
3217
3218 let mut flow = FlowBuilder::new();
3219 let node = flow.process::<()>();
3220 let external = flow.external::<()>();
3221
3222 let (input_port, input) =
3223 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3224 let tick = node.tick();
3225
3226 let out = input
3227 .batch(&tick, nondet!(/** test */))
3228 .cross_singleton(
3229 node.source_iter(q!(vec![1, 2, 3]))
3230 .atomic()
3231 .fold(q!(|| 0), q!(|acc, v| *acc += v))
3232 .snapshot_atomic(&tick, nondet!(/** test */)),
3233 )
3234 .all_ticks()
3235 .send_bincode_external(&external);
3236
3237 let nodes = flow
3238 .with_process(&node, deployment.Localhost())
3239 .with_external(&external, deployment.Localhost())
3240 .deploy(&mut deployment);
3241
3242 deployment.deploy().await.unwrap();
3243
3244 let mut external_in = nodes.connect(input_port).await;
3245 let mut external_out = nodes.connect(out).await;
3246
3247 deployment.start().await.unwrap();
3248
3249 external_in.send(1).await.unwrap();
3250 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3251
3252 external_in.send(2).await.unwrap();
3253 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3254 }
3255
3256 #[cfg(feature = "deploy")]
3257 #[tokio::test]
3258 async fn unbounded_scan_remembers_state() {
3259 let mut deployment = Deployment::new();
3260
3261 let mut flow = FlowBuilder::new();
3262 let node = flow.process::<()>();
3263 let external = flow.external::<()>();
3264
3265 let (input_port, input) = node.source_external_bincode(&external);
3266 let out = input
3267 .scan(
3268 q!(|| 0),
3269 q!(|acc, v| {
3270 *acc += v;
3271 Some(*acc)
3272 }),
3273 )
3274 .send_bincode_external(&external);
3275
3276 let nodes = flow
3277 .with_process(&node, deployment.Localhost())
3278 .with_external(&external, deployment.Localhost())
3279 .deploy(&mut deployment);
3280
3281 deployment.deploy().await.unwrap();
3282
3283 let mut external_in = nodes.connect(input_port).await;
3284 let mut external_out = nodes.connect(out).await;
3285
3286 deployment.start().await.unwrap();
3287
3288 external_in.send(1).await.unwrap();
3289 assert_eq!(external_out.next().await.unwrap(), 1);
3290
3291 external_in.send(2).await.unwrap();
3292 assert_eq!(external_out.next().await.unwrap(), 3);
3293 }
3294
3295 #[cfg(feature = "deploy")]
3296 #[tokio::test]
3297 async fn unbounded_enumerate_remembers_state() {
3298 let mut deployment = Deployment::new();
3299
3300 let mut flow = FlowBuilder::new();
3301 let node = flow.process::<()>();
3302 let external = flow.external::<()>();
3303
3304 let (input_port, input) = node.source_external_bincode(&external);
3305 let out = input.enumerate().send_bincode_external(&external);
3306
3307 let nodes = flow
3308 .with_process(&node, deployment.Localhost())
3309 .with_external(&external, deployment.Localhost())
3310 .deploy(&mut deployment);
3311
3312 deployment.deploy().await.unwrap();
3313
3314 let mut external_in = nodes.connect(input_port).await;
3315 let mut external_out = nodes.connect(out).await;
3316
3317 deployment.start().await.unwrap();
3318
3319 external_in.send(1).await.unwrap();
3320 assert_eq!(external_out.next().await.unwrap(), (0, 1));
3321
3322 external_in.send(2).await.unwrap();
3323 assert_eq!(external_out.next().await.unwrap(), (1, 2));
3324 }
3325
3326 #[cfg(feature = "deploy")]
3327 #[tokio::test]
3328 async fn unbounded_unique_remembers_state() {
3329 let mut deployment = Deployment::new();
3330
3331 let mut flow = FlowBuilder::new();
3332 let node = flow.process::<()>();
3333 let external = flow.external::<()>();
3334
3335 let (input_port, input) =
3336 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3337 let out = input.unique().send_bincode_external(&external);
3338
3339 let nodes = flow
3340 .with_process(&node, deployment.Localhost())
3341 .with_external(&external, deployment.Localhost())
3342 .deploy(&mut deployment);
3343
3344 deployment.deploy().await.unwrap();
3345
3346 let mut external_in = nodes.connect(input_port).await;
3347 let mut external_out = nodes.connect(out).await;
3348
3349 deployment.start().await.unwrap();
3350
3351 external_in.send(1).await.unwrap();
3352 assert_eq!(external_out.next().await.unwrap(), 1);
3353
3354 external_in.send(2).await.unwrap();
3355 assert_eq!(external_out.next().await.unwrap(), 2);
3356
3357 external_in.send(1).await.unwrap();
3358 external_in.send(3).await.unwrap();
3359 assert_eq!(external_out.next().await.unwrap(), 3);
3360 }
3361
3362 #[cfg(feature = "sim")]
3363 #[test]
3364 #[should_panic]
3365 fn sim_batch_nondet_size() {
3366 let mut flow = FlowBuilder::new();
3367 let node = flow.process::<()>();
3368
3369 let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3370
3371 let tick = node.tick();
3372 let out_recv = input
3373 .batch(&tick, nondet!(/** test */))
3374 .count()
3375 .all_ticks()
3376 .sim_output();
3377
3378 flow.sim().exhaustive(async || {
3379 in_send.send(());
3380 in_send.send(());
3381 in_send.send(());
3382
3383 assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3384 });
3385 }
3386
3387 #[cfg(feature = "sim")]
3388 #[test]
3389 fn sim_batch_preserves_order() {
3390 let mut flow = FlowBuilder::new();
3391 let node = flow.process::<()>();
3392
3393 let (in_send, input) = node.sim_input();
3394
3395 let tick = node.tick();
3396 let out_recv = input
3397 .batch(&tick, nondet!(/** test */))
3398 .all_ticks()
3399 .sim_output();
3400
3401 flow.sim().exhaustive(async || {
3402 in_send.send(1);
3403 in_send.send(2);
3404 in_send.send(3);
3405
3406 out_recv.assert_yields_only([1, 2, 3]).await;
3407 });
3408 }
3409
3410 #[cfg(feature = "sim")]
3411 #[test]
3412 #[should_panic]
3413 fn sim_batch_unordered_shuffles() {
3414 let mut flow = FlowBuilder::new();
3415 let node = flow.process::<()>();
3416
3417 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3418
3419 let tick = node.tick();
3420 let batch = input.batch(&tick, nondet!(/** test */));
3421 let out_recv = batch
3422 .clone()
3423 .min()
3424 .zip(batch.max())
3425 .all_ticks()
3426 .sim_output();
3427
3428 flow.sim().exhaustive(async || {
3429 in_send.send_many_unordered([1, 2, 3]);
3430
3431 if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3432 panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3433 }
3434 });
3435 }
3436
3437 #[cfg(feature = "sim")]
3438 #[test]
3439 fn sim_batch_unordered_shuffles_count() {
3440 let mut flow = FlowBuilder::new();
3441 let node = flow.process::<()>();
3442
3443 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3444
3445 let tick = node.tick();
3446 let batch = input.batch(&tick, nondet!(/** test */));
3447 let out_recv = batch.all_ticks().sim_output();
3448
3449 let instance_count = flow.sim().exhaustive(async || {
3450 in_send.send_many_unordered([1, 2, 3, 4]);
3451 out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3452 });
3453
3454 assert_eq!(
3455 instance_count,
3456 75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3457 )
3458 }
3459
3460 #[cfg(feature = "sim")]
3461 #[test]
3462 #[should_panic]
3463 fn sim_observe_order_batched() {
3464 let mut flow = FlowBuilder::new();
3465 let node = flow.process::<()>();
3466
3467 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3468
3469 let tick = node.tick();
3470 let batch = input.batch(&tick, nondet!(/** test */));
3471 let out_recv = batch
3472 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3473 .all_ticks()
3474 .sim_output();
3475
3476 flow.sim().exhaustive(async || {
3477 in_send.send_many_unordered([1, 2, 3, 4]);
3478 out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3479 });
3480 }
3481
3482 #[cfg(feature = "sim")]
3483 #[test]
3484 fn sim_observe_order_batched_count() {
3485 let mut flow = FlowBuilder::new();
3486 let node = flow.process::<()>();
3487
3488 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3489
3490 let tick = node.tick();
3491 let batch = input.batch(&tick, nondet!(/** test */));
3492 let out_recv = batch
3493 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3494 .all_ticks()
3495 .sim_output();
3496
3497 let instance_count = flow.sim().exhaustive(async || {
3498 in_send.send_many_unordered([1, 2, 3, 4]);
3499 let _ = out_recv.collect::<Vec<_>>().await;
3500 });
3501
3502 assert_eq!(
3503 instance_count,
3504 192 // 4! * 2^{4 - 1}
3505 )
3506 }
3507
3508 #[cfg(feature = "sim")]
3509 #[test]
3510 fn sim_unordered_count_instance_count() {
3511 let mut flow = FlowBuilder::new();
3512 let node = flow.process::<()>();
3513
3514 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3515
3516 let tick = node.tick();
3517 let out_recv = input
3518 .count()
3519 .snapshot(&tick, nondet!(/** test */))
3520 .all_ticks()
3521 .sim_output();
3522
3523 let instance_count = flow.sim().exhaustive(async || {
3524 in_send.send_many_unordered([1, 2, 3, 4]);
3525 assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3526 });
3527
3528 assert_eq!(
3529 instance_count,
3530 16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3531 )
3532 }
3533
3534 #[cfg(feature = "sim")]
3535 #[test]
3536 fn sim_top_level_assume_ordering() {
3537 let mut flow = FlowBuilder::new();
3538 let node = flow.process::<()>();
3539
3540 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3541
3542 let out_recv = input
3543 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3544 .sim_output();
3545
3546 let instance_count = flow.sim().exhaustive(async || {
3547 in_send.send_many_unordered([1, 2, 3]);
3548 let mut out = out_recv.collect::<Vec<_>>().await;
3549 out.sort();
3550 assert_eq!(out, vec![1, 2, 3]);
3551 });
3552
3553 assert_eq!(instance_count, 6)
3554 }
3555
3556 #[cfg(feature = "sim")]
3557 #[test]
3558 fn sim_top_level_assume_ordering_cycle_back() {
3559 let mut flow = FlowBuilder::new();
3560 let node = flow.process::<()>();
3561 let node2 = flow.process::<()>();
3562
3563 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3564
3565 let (complete_cycle_back, cycle_back) =
3566 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3567 let ordered = input
3568 .merge_unordered(cycle_back)
3569 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3570 complete_cycle_back.complete(
3571 ordered
3572 .clone()
3573 .map(q!(|v| v + 1))
3574 .filter(q!(|v| v % 2 == 1))
3575 .send(&node2, TCP.fail_stop().bincode())
3576 .send(&node, TCP.fail_stop().bincode()),
3577 );
3578
3579 let out_recv = ordered.sim_output();
3580
3581 let mut saw = false;
3582 let instance_count = flow.sim().exhaustive(async || {
3583 in_send.send_many_unordered([0, 2]);
3584 let out = out_recv.collect::<Vec<_>>().await;
3585
3586 if out.starts_with(&[0, 1, 2]) {
3587 saw = true;
3588 }
3589 });
3590
3591 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3592 assert_eq!(instance_count, 6);
3593 }
3594
3595 #[cfg(feature = "sim")]
3596 #[test]
3597 fn sim_top_level_assume_ordering_cycle_back_tick() {
3598 let mut flow = FlowBuilder::new();
3599 let node = flow.process::<()>();
3600 let node2 = flow.process::<()>();
3601
3602 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3603
3604 let (complete_cycle_back, cycle_back) =
3605 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3606 let ordered = input
3607 .merge_unordered(cycle_back)
3608 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3609 complete_cycle_back.complete(
3610 ordered
3611 .clone()
3612 .batch(&node.tick(), nondet!(/** test */))
3613 .all_ticks()
3614 .map(q!(|v| v + 1))
3615 .filter(q!(|v| v % 2 == 1))
3616 .send(&node2, TCP.fail_stop().bincode())
3617 .send(&node, TCP.fail_stop().bincode()),
3618 );
3619
3620 let out_recv = ordered.sim_output();
3621
3622 let mut saw = false;
3623 let instance_count = flow.sim().exhaustive(async || {
3624 in_send.send_many_unordered([0, 2]);
3625 let out = out_recv.collect::<Vec<_>>().await;
3626
3627 if out.starts_with(&[0, 1, 2]) {
3628 saw = true;
3629 }
3630 });
3631
3632 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3633 assert_eq!(instance_count, 58);
3634 }
3635
3636 #[cfg(feature = "sim")]
3637 #[test]
3638 fn sim_top_level_assume_ordering_multiple() {
3639 let mut flow = FlowBuilder::new();
3640 let node = flow.process::<()>();
3641 let node2 = flow.process::<()>();
3642
3643 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3644 let (_, input2) = node.sim_input::<_, NoOrder, _>();
3645
3646 let (complete_cycle_back, cycle_back) =
3647 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3648 let input1_ordered = input
3649 .clone()
3650 .merge_unordered(cycle_back)
3651 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3652 let foo = input1_ordered
3653 .clone()
3654 .map(q!(|v| v + 3))
3655 .weaken_ordering::<NoOrder>()
3656 .merge_unordered(input2)
3657 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3658
3659 complete_cycle_back.complete(
3660 foo.filter(q!(|v| *v == 3))
3661 .send(&node2, TCP.fail_stop().bincode())
3662 .send(&node, TCP.fail_stop().bincode()),
3663 );
3664
3665 let out_recv = input1_ordered.sim_output();
3666
3667 let mut saw = false;
3668 let instance_count = flow.sim().exhaustive(async || {
3669 in_send.send_many_unordered([0, 1]);
3670 let out = out_recv.collect::<Vec<_>>().await;
3671
3672 if out.starts_with(&[0, 3, 1]) {
3673 saw = true;
3674 }
3675 });
3676
3677 assert!(saw, "did not see an instance with 0, 3, 1 in order");
3678 assert_eq!(instance_count, 24);
3679 }
3680
3681 #[cfg(feature = "sim")]
3682 #[test]
3683 fn sim_atomic_assume_ordering_cycle_back() {
3684 let mut flow = FlowBuilder::new();
3685 let node = flow.process::<()>();
3686 let node2 = flow.process::<()>();
3687
3688 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3689
3690 let (complete_cycle_back, cycle_back) =
3691 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3692 let ordered = input
3693 .merge_unordered(cycle_back)
3694 .atomic()
3695 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3696 .end_atomic();
3697 complete_cycle_back.complete(
3698 ordered
3699 .clone()
3700 .map(q!(|v| v + 1))
3701 .filter(q!(|v| v % 2 == 1))
3702 .send(&node2, TCP.fail_stop().bincode())
3703 .send(&node, TCP.fail_stop().bincode()),
3704 );
3705
3706 let out_recv = ordered.sim_output();
3707
3708 let instance_count = flow.sim().exhaustive(async || {
3709 in_send.send_many_unordered([0, 2]);
3710 let out = out_recv.collect::<Vec<_>>().await;
3711 assert_eq!(out.len(), 4);
3712 });
3713 assert_eq!(instance_count, 22);
3714 }
3715
3716 #[cfg(feature = "deploy")]
3717 #[tokio::test]
3718 async fn partition_evens_odds() {
3719 let mut deployment = Deployment::new();
3720
3721 let mut flow = FlowBuilder::new();
3722 let node = flow.process::<()>();
3723 let external = flow.external::<()>();
3724
3725 let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
3726 let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
3727 let evens_port = evens.send_bincode_external(&external);
3728 let odds_port = odds.send_bincode_external(&external);
3729
3730 let nodes = flow
3731 .with_process(&node, deployment.Localhost())
3732 .with_external(&external, deployment.Localhost())
3733 .deploy(&mut deployment);
3734
3735 deployment.deploy().await.unwrap();
3736
3737 let mut evens_out = nodes.connect(evens_port).await;
3738 let mut odds_out = nodes.connect(odds_port).await;
3739
3740 deployment.start().await.unwrap();
3741
3742 let mut even_results = Vec::new();
3743 for _ in 0..3 {
3744 even_results.push(evens_out.next().await.unwrap());
3745 }
3746 even_results.sort();
3747 assert_eq!(even_results, vec![2, 4, 6]);
3748
3749 let mut odd_results = Vec::new();
3750 for _ in 0..3 {
3751 odd_results.push(odds_out.next().await.unwrap());
3752 }
3753 odd_results.sort();
3754 assert_eq!(odd_results, vec![1, 3, 5]);
3755 }
3756
3757 #[cfg(feature = "deploy")]
3758 #[tokio::test]
3759 async fn unconsumed_inspect_still_runs() {
3760 use crate::deploy::DeployCrateWrapper;
3761
3762 let mut deployment = Deployment::new();
3763
3764 let mut flow = FlowBuilder::new();
3765 let node = flow.process::<()>();
3766
3767 // The return value of .inspect() is intentionally dropped.
3768 // Before the Null-root fix, this would silently do nothing.
3769 node.source_iter(q!(0..5))
3770 .inspect(q!(|x| println!("inspect: {}", x)));
3771
3772 let nodes = flow
3773 .with_process(&node, deployment.Localhost())
3774 .deploy(&mut deployment);
3775
3776 deployment.deploy().await.unwrap();
3777
3778 let mut stdout = nodes.get_process(&node).stdout();
3779
3780 deployment.start().await.unwrap();
3781
3782 let mut lines = Vec::new();
3783 for _ in 0..5 {
3784 lines.push(stdout.recv().await.unwrap());
3785 }
3786 lines.sort();
3787 assert_eq!(
3788 lines,
3789 vec![
3790 "inspect: 0",
3791 "inspect: 1",
3792 "inspect: 2",
3793 "inspect: 3",
3794 "inspect: 4",
3795 ]
3796 );
3797 }
3798
3799 #[cfg(feature = "sim")]
3800 #[test]
3801 fn sim_limit() {
3802 let mut flow = FlowBuilder::new();
3803 let node = flow.process::<()>();
3804
3805 let (in_send, input) = node.sim_input();
3806
3807 let out_recv = input.limit(q!(3)).sim_output();
3808
3809 flow.sim().exhaustive(async || {
3810 in_send.send(1);
3811 in_send.send(2);
3812 in_send.send(3);
3813 in_send.send(4);
3814 in_send.send(5);
3815
3816 out_recv.assert_yields_only([1, 2, 3]).await;
3817 });
3818 }
3819
3820 #[cfg(feature = "sim")]
3821 #[test]
3822 fn sim_limit_zero() {
3823 let mut flow = FlowBuilder::new();
3824 let node = flow.process::<()>();
3825
3826 let (in_send, input) = node.sim_input();
3827
3828 let out_recv = input.limit(q!(0)).sim_output();
3829
3830 flow.sim().exhaustive(async || {
3831 in_send.send(1);
3832 in_send.send(2);
3833
3834 out_recv.assert_yields_only::<i32, _>([]).await;
3835 });
3836 }
3837
3838 #[cfg(feature = "deploy")]
3839 #[tokio::test]
3840 async fn monotone_fold_threshold() {
3841 use crate::properties::manual_proof;
3842
3843 let mut deployment = Deployment::new();
3844
3845 let mut flow = FlowBuilder::new();
3846 let node = flow.process::<()>();
3847 let external = flow.external::<()>();
3848
3849 let in_unbounded: super::Stream<_, _> =
3850 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
3851 let sum = in_unbounded.fold(
3852 q!(|| 0),
3853 q!(
3854 |sum, v| {
3855 *sum += v;
3856 },
3857 monotone = manual_proof!(/** test */)
3858 ),
3859 );
3860
3861 let threshold_out = sum
3862 .threshold_greater_or_equal(node.singleton(q!(7)))
3863 .send_bincode_external(&external);
3864
3865 let nodes = flow
3866 .with_process(&node, deployment.Localhost())
3867 .with_external(&external, deployment.Localhost())
3868 .deploy(&mut deployment);
3869
3870 deployment.deploy().await.unwrap();
3871
3872 let mut threshold_out = nodes.connect(threshold_out).await;
3873
3874 deployment.start().await.unwrap();
3875
3876 assert_eq!(threshold_out.next().await.unwrap(), 7);
3877 }
3878
3879 #[cfg(feature = "deploy")]
3880 #[tokio::test]
3881 async fn monotone_count_threshold() {
3882 let mut deployment = Deployment::new();
3883
3884 let mut flow = FlowBuilder::new();
3885 let node = flow.process::<()>();
3886 let external = flow.external::<()>();
3887
3888 let in_unbounded: super::Stream<_, _> =
3889 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
3890 let sum = in_unbounded.count();
3891
3892 let threshold_out = sum
3893 .threshold_greater_or_equal(node.singleton(q!(3)))
3894 .send_bincode_external(&external);
3895
3896 let nodes = flow
3897 .with_process(&node, deployment.Localhost())
3898 .with_external(&external, deployment.Localhost())
3899 .deploy(&mut deployment);
3900
3901 deployment.deploy().await.unwrap();
3902
3903 let mut threshold_out = nodes.connect(threshold_out).await;
3904
3905 deployment.start().await.unwrap();
3906
3907 assert_eq!(threshold_out.next().await.unwrap(), 3);
3908 }
3909}