hydro_lang/live_collections/stream/mod.rs
1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::{Generate, KeyedStream};
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::singleton::SingletonBound;
27#[cfg(stageleft_runtime)]
28use crate::location::dynamic::{DynLocation, LocationId};
29use crate::location::tick::{Atomic, DeferTick, NoAtomic};
30use crate::location::{Location, NoTick, Tick, check_matching_location};
31use crate::manual_expr::ManualExpr;
32use crate::nondet::{NonDet, nondet};
33use crate::prelude::manual_proof;
34use crate::properties::{
35 AggFuncAlgebra, ApplyMonotoneStream, ValidCommutativityFor, ValidIdempotenceFor,
36};
37
38pub mod networking;
39
40/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
41#[sealed::sealed]
42pub trait Ordering:
43 MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
44{
45 /// The [`StreamOrder`] corresponding to this type.
46 const ORDERING_KIND: StreamOrder;
47}
48
49/// Marks the stream as being totally ordered, which means that there are
50/// no sources of non-determinism (other than intentional ones) that will
51/// affect the order of elements.
52pub enum TotalOrder {}
53
54#[sealed::sealed]
55impl Ordering for TotalOrder {
56 const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
57}
58
59/// Marks the stream as having no order, which means that the order of
60/// elements may be affected by non-determinism.
61///
62/// This restricts certain operators, such as `fold` and `reduce`, to only
63/// be used with commutative aggregation functions.
64pub enum NoOrder {}
65
66#[sealed::sealed]
67impl Ordering for NoOrder {
68 const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
69}
70
71/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
72/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
73/// have `Self` guarantees instead.
74#[sealed::sealed]
75pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
76#[sealed::sealed]
77impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
78
79/// Helper trait for determining the weakest of two orderings.
80#[sealed::sealed]
81pub trait MinOrder<Other: ?Sized> {
82 /// The weaker of the two orderings.
83 type Min: Ordering;
84}
85
86#[sealed::sealed]
87impl<O: Ordering> MinOrder<O> for TotalOrder {
88 type Min = O;
89}
90
91#[sealed::sealed]
92impl<O: Ordering> MinOrder<O> for NoOrder {
93 type Min = NoOrder;
94}
95
96/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
97#[sealed::sealed]
98pub trait Retries:
99 MinRetries<Self, Min = Self>
100 + MinRetries<ExactlyOnce, Min = Self>
101 + MinRetries<AtLeastOnce, Min = AtLeastOnce>
102{
103 /// The [`StreamRetry`] corresponding to this type.
104 const RETRIES_KIND: StreamRetry;
105}
106
107/// Marks the stream as having deterministic message cardinality, with no
108/// possibility of duplicates.
109pub enum ExactlyOnce {}
110
111#[sealed::sealed]
112impl Retries for ExactlyOnce {
113 const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
114}
115
116/// Marks the stream as having non-deterministic message cardinality, which
117/// means that duplicates may occur, but messages will not be dropped.
118pub enum AtLeastOnce {}
119
120#[sealed::sealed]
121impl Retries for AtLeastOnce {
122 const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
123}
124
125/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
126/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
127/// have `Self` guarantees instead.
128#[sealed::sealed]
129pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
130#[sealed::sealed]
131impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
132
133/// Helper trait for determining the weakest of two retry guarantees.
134#[sealed::sealed]
135pub trait MinRetries<Other: ?Sized> {
136 /// The weaker of the two retry guarantees.
137 type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
138}
139
140#[sealed::sealed]
141impl<R: Retries> MinRetries<R> for ExactlyOnce {
142 type Min = R;
143}
144
145#[sealed::sealed]
146impl<R: Retries> MinRetries<R> for AtLeastOnce {
147 type Min = AtLeastOnce;
148}
149
150#[sealed::sealed]
151#[diagnostic::on_unimplemented(
152 message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
153 label = "required here",
154 note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
155)]
156/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
157pub trait IsOrdered: Ordering {}
158
159#[sealed::sealed]
160#[diagnostic::do_not_recommend]
161impl IsOrdered for TotalOrder {}
162
163#[sealed::sealed]
164#[diagnostic::on_unimplemented(
165 message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
166 label = "required here",
167 note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
168)]
169/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
170pub trait IsExactlyOnce: Retries {}
171
172#[sealed::sealed]
173#[diagnostic::do_not_recommend]
174impl IsExactlyOnce for ExactlyOnce {}
175
176/// Streaming sequence of elements with type `Type`.
177///
178/// This live collection represents a growing sequence of elements, with new elements being
179/// asynchronously appended to the end of the sequence. This can be used to model the arrival
180/// of network input, such as API requests, or streaming ingestion.
181///
182/// By default, all streams have deterministic ordering and each element is materialized exactly
183/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
184/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
185/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
186///
187/// Type Parameters:
188/// - `Type`: the type of elements in the stream
189/// - `Loc`: the location where the stream is being materialized
190/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
191/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
192/// (default is [`TotalOrder`])
193/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
194/// [`AtLeastOnce`] (default is [`ExactlyOnce`])
195pub struct Stream<
196 Type,
197 Loc,
198 Bound: Boundedness = Unbounded,
199 Order: Ordering = TotalOrder,
200 Retry: Retries = ExactlyOnce,
201> {
202 pub(crate) location: Loc,
203 pub(crate) ir_node: RefCell<HydroNode>,
204 pub(crate) flow_state: FlowState,
205
206 _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
207}
208
209impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
210 fn drop(&mut self) {
211 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
212 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
213 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
214 input: Box::new(ir_node),
215 op_metadata: HydroIrOpMetadata::new(),
216 });
217 }
218 }
219}
220
221impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
222 for Stream<T, L, Unbounded, O, R>
223where
224 L: Location<'a>,
225{
226 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
227 let new_meta = stream
228 .location
229 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
230
231 Stream {
232 location: stream.location.clone(),
233 flow_state: stream.flow_state.clone(),
234 ir_node: RefCell::new(HydroNode::Cast {
235 inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
236 metadata: new_meta,
237 }),
238 _phantom: PhantomData,
239 }
240 }
241}
242
243impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
244 for Stream<T, L, B, NoOrder, R>
245where
246 L: Location<'a>,
247{
248 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
249 stream.weaken_ordering()
250 }
251}
252
253impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
254 for Stream<T, L, B, O, AtLeastOnce>
255where
256 L: Location<'a>,
257{
258 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
259 stream.weaken_retries()
260 }
261}
262
263impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
264where
265 L: Location<'a>,
266{
267 fn defer_tick(self) -> Self {
268 Stream::defer_tick(self)
269 }
270}
271
272impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
273 for Stream<T, Tick<L>, Bounded, O, R>
274where
275 L: Location<'a>,
276{
277 type Location = Tick<L>;
278
279 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
280 Stream::new(
281 location.clone(),
282 HydroNode::CycleSource {
283 cycle_id,
284 metadata: location.new_node_metadata(Self::collection_kind()),
285 },
286 )
287 }
288}
289
290impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
291 for Stream<T, Tick<L>, Bounded, O, R>
292where
293 L: Location<'a>,
294{
295 type Location = Tick<L>;
296
297 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
298 let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
299 location.clone(),
300 HydroNode::DeferTick {
301 input: Box::new(HydroNode::CycleSource {
302 cycle_id,
303 metadata: location.new_node_metadata(Self::collection_kind()),
304 }),
305 metadata: location.new_node_metadata(Self::collection_kind()),
306 },
307 );
308
309 from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
310 }
311}
312
313impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
314 for Stream<T, Tick<L>, Bounded, O, R>
315where
316 L: Location<'a>,
317{
318 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
319 assert_eq!(
320 Location::id(&self.location),
321 expected_location,
322 "locations do not match"
323 );
324 self.location
325 .flow_state()
326 .borrow_mut()
327 .push_root(HydroRoot::CycleSink {
328 cycle_id,
329 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
330 op_metadata: HydroIrOpMetadata::new(),
331 });
332 }
333}
334
335impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
336 for Stream<T, L, B, O, R>
337where
338 L: Location<'a> + NoTick,
339{
340 type Location = L;
341
342 fn create_source(cycle_id: CycleId, location: L) -> Self {
343 Stream::new(
344 location.clone(),
345 HydroNode::CycleSource {
346 cycle_id,
347 metadata: location.new_node_metadata(Self::collection_kind()),
348 },
349 )
350 }
351}
352
353impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
354 for Stream<T, L, B, O, R>
355where
356 L: Location<'a> + NoTick,
357{
358 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
359 assert_eq!(
360 Location::id(&self.location),
361 expected_location,
362 "locations do not match"
363 );
364 self.location
365 .flow_state()
366 .borrow_mut()
367 .push_root(HydroRoot::CycleSink {
368 cycle_id,
369 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
370 op_metadata: HydroIrOpMetadata::new(),
371 });
372 }
373}
374
375impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
376where
377 T: Clone,
378 L: Location<'a>,
379{
380 fn clone(&self) -> Self {
381 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
382 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
383 *self.ir_node.borrow_mut() = HydroNode::Tee {
384 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
385 metadata: self.location.new_node_metadata(Self::collection_kind()),
386 };
387 }
388
389 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
390 Stream {
391 location: self.location.clone(),
392 flow_state: self.flow_state.clone(),
393 ir_node: HydroNode::Tee {
394 inner: SharedNode(inner.0.clone()),
395 metadata: metadata.clone(),
396 }
397 .into(),
398 _phantom: PhantomData,
399 }
400 } else {
401 unreachable!()
402 }
403 }
404}
405
406impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
407where
408 L: Location<'a>,
409{
410 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
411 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
412 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
413
414 let flow_state = location.flow_state().clone();
415 Stream {
416 location,
417 flow_state,
418 ir_node: RefCell::new(ir_node),
419 _phantom: PhantomData,
420 }
421 }
422
423 /// Returns the [`Location`] where this stream is being materialized.
424 pub fn location(&self) -> &L {
425 &self.location
426 }
427
428 pub(crate) fn collection_kind() -> CollectionKind {
429 CollectionKind::Stream {
430 bound: B::BOUND_KIND,
431 order: O::ORDERING_KIND,
432 retry: R::RETRIES_KIND,
433 element_type: quote_type::<T>().into(),
434 }
435 }
436
437 /// Produces a stream based on invoking `f` on each element.
438 /// If you do not want to modify the stream and instead only want to view
439 /// each item use [`Stream::inspect`] instead.
440 ///
441 /// # Example
442 /// ```rust
443 /// # #[cfg(feature = "deploy")] {
444 /// # use hydro_lang::prelude::*;
445 /// # use futures::StreamExt;
446 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
447 /// let words = process.source_iter(q!(vec!["hello", "world"]));
448 /// words.map(q!(|x| x.to_uppercase()))
449 /// # }, |mut stream| async move {
450 /// # for w in vec!["HELLO", "WORLD"] {
451 /// # assert_eq!(stream.next().await.unwrap(), w);
452 /// # }
453 /// # }));
454 /// # }
455 /// ```
456 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
457 where
458 F: Fn(T) -> U + 'a,
459 {
460 let f = f.splice_fn1_ctx(&self.location).into();
461 Stream::new(
462 self.location.clone(),
463 HydroNode::Map {
464 f,
465 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
466 metadata: self
467 .location
468 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
469 },
470 )
471 }
472
473 /// For each item `i` in the input stream, transform `i` using `f` and then treat the
474 /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
475 /// for the output type `U` must produce items in a **deterministic** order.
476 ///
477 /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
478 /// not deterministic, use [`Stream::flat_map_unordered`] instead.
479 ///
480 /// # Example
481 /// ```rust
482 /// # #[cfg(feature = "deploy")] {
483 /// # use hydro_lang::prelude::*;
484 /// # use futures::StreamExt;
485 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
486 /// process
487 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
488 /// .flat_map_ordered(q!(|x| x))
489 /// # }, |mut stream| async move {
490 /// // 1, 2, 3, 4
491 /// # for w in (1..5) {
492 /// # assert_eq!(stream.next().await.unwrap(), w);
493 /// # }
494 /// # }));
495 /// # }
496 /// ```
497 pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
498 where
499 I: IntoIterator<Item = U>,
500 F: Fn(T) -> I + 'a,
501 {
502 let f = f.splice_fn1_ctx(&self.location).into();
503 Stream::new(
504 self.location.clone(),
505 HydroNode::FlatMap {
506 f,
507 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
508 metadata: self
509 .location
510 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
511 },
512 )
513 }
514
515 /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
516 /// for the output type `U` to produce items in any order.
517 ///
518 /// # Example
519 /// ```rust
520 /// # #[cfg(feature = "deploy")] {
521 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
522 /// # use futures::StreamExt;
523 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
524 /// process
525 /// .source_iter(q!(vec![
526 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
527 /// std::collections::HashSet::from_iter(vec![3, 4]),
528 /// ]))
529 /// .flat_map_unordered(q!(|x| x))
530 /// # }, |mut stream| async move {
531 /// // 1, 2, 3, 4, but in no particular order
532 /// # let mut results = Vec::new();
533 /// # for w in (1..5) {
534 /// # results.push(stream.next().await.unwrap());
535 /// # }
536 /// # results.sort();
537 /// # assert_eq!(results, vec![1, 2, 3, 4]);
538 /// # }));
539 /// # }
540 /// ```
541 pub fn flat_map_unordered<U, I, F>(
542 self,
543 f: impl IntoQuotedMut<'a, F, L>,
544 ) -> Stream<U, L, B, NoOrder, R>
545 where
546 I: IntoIterator<Item = U>,
547 F: Fn(T) -> I + 'a,
548 {
549 let f = f.splice_fn1_ctx(&self.location).into();
550 Stream::new(
551 self.location.clone(),
552 HydroNode::FlatMap {
553 f,
554 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
555 metadata: self
556 .location
557 .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
558 },
559 )
560 }
561
562 /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
563 /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
564 ///
565 /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
566 /// not deterministic, use [`Stream::flatten_unordered`] instead.
567 ///
568 /// ```rust
569 /// # #[cfg(feature = "deploy")] {
570 /// # use hydro_lang::prelude::*;
571 /// # use futures::StreamExt;
572 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
573 /// process
574 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
575 /// .flatten_ordered()
576 /// # }, |mut stream| async move {
577 /// // 1, 2, 3, 4
578 /// # for w in (1..5) {
579 /// # assert_eq!(stream.next().await.unwrap(), w);
580 /// # }
581 /// # }));
582 /// # }
583 /// ```
584 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
585 where
586 T: IntoIterator<Item = U>,
587 {
588 self.flat_map_ordered(q!(|d| d))
589 }
590
591 /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
592 /// for the element type `T` to produce items in any order.
593 ///
594 /// # Example
595 /// ```rust
596 /// # #[cfg(feature = "deploy")] {
597 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
598 /// # use futures::StreamExt;
599 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
600 /// process
601 /// .source_iter(q!(vec![
602 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
603 /// std::collections::HashSet::from_iter(vec![3, 4]),
604 /// ]))
605 /// .flatten_unordered()
606 /// # }, |mut stream| async move {
607 /// // 1, 2, 3, 4, but in no particular order
608 /// # let mut results = Vec::new();
609 /// # for w in (1..5) {
610 /// # results.push(stream.next().await.unwrap());
611 /// # }
612 /// # results.sort();
613 /// # assert_eq!(results, vec![1, 2, 3, 4]);
614 /// # }));
615 /// # }
616 /// ```
617 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
618 where
619 T: IntoIterator<Item = U>,
620 {
621 self.flat_map_unordered(q!(|d| d))
622 }
623
624 /// For each item in the input stream, apply `f` to produce a [`futures::stream::Stream`],
625 /// then emit the elements of that stream one by one. When the inner stream yields
626 /// `Pending`, this operator yields as well.
627 pub fn flat_map_stream_blocking<U, S, F>(
628 self,
629 f: impl IntoQuotedMut<'a, F, L>,
630 ) -> Stream<U, L, B, O, R>
631 where
632 S: futures::Stream<Item = U>,
633 F: Fn(T) -> S + 'a,
634 {
635 let f = f.splice_fn1_ctx(&self.location).into();
636 Stream::new(
637 self.location.clone(),
638 HydroNode::FlatMapStreamBlocking {
639 f,
640 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
641 metadata: self
642 .location
643 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
644 },
645 )
646 }
647
648 /// For each item in the input stream, treat it as a [`futures::stream::Stream`] and
649 /// emit its elements one by one. When the inner stream yields `Pending`, this operator
650 /// yields as well.
651 pub fn flatten_stream_blocking<U>(self) -> Stream<U, L, B, O, R>
652 where
653 T: futures::Stream<Item = U>,
654 {
655 self.flat_map_stream_blocking(q!(|d| d))
656 }
657
658 /// Creates a stream containing only the elements of the input stream that satisfy a predicate
659 /// `f`, preserving the order of the elements.
660 ///
661 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
662 /// not modify or take ownership of the values. If you need to modify the values while filtering
663 /// use [`Stream::filter_map`] instead.
664 ///
665 /// # Example
666 /// ```rust
667 /// # #[cfg(feature = "deploy")] {
668 /// # use hydro_lang::prelude::*;
669 /// # use futures::StreamExt;
670 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
671 /// process
672 /// .source_iter(q!(vec![1, 2, 3, 4]))
673 /// .filter(q!(|&x| x > 2))
674 /// # }, |mut stream| async move {
675 /// // 3, 4
676 /// # for w in (3..5) {
677 /// # assert_eq!(stream.next().await.unwrap(), w);
678 /// # }
679 /// # }));
680 /// # }
681 /// ```
682 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
683 where
684 F: Fn(&T) -> bool + 'a,
685 {
686 let f = f.splice_fn1_borrow_ctx(&self.location).into();
687 Stream::new(
688 self.location.clone(),
689 HydroNode::Filter {
690 f,
691 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
692 metadata: self.location.new_node_metadata(Self::collection_kind()),
693 },
694 )
695 }
696
697 /// Splits the stream into two streams based on a predicate, without cloning elements.
698 ///
699 /// Elements for which `f` returns `true` are sent to the first output stream,
700 /// and elements for which `f` returns `false` are sent to the second output stream.
701 ///
702 /// Unlike using `filter` twice, this only evaluates the predicate once per element
703 /// and does not require `T: Clone`.
704 ///
705 /// The closure `f` receives a reference `&T` rather than an owned value `T` because
706 /// the predicate is only used for routing; the element itself is moved to the
707 /// appropriate output stream.
708 ///
709 /// # Example
710 /// ```rust
711 /// # #[cfg(feature = "deploy")] {
712 /// # use hydro_lang::prelude::*;
713 /// # use hydro_lang::live_collections::stream::{NoOrder, ExactlyOnce};
714 /// # use futures::StreamExt;
715 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
716 /// let numbers: Stream<_, _, Unbounded> = process.source_iter(q!(vec![1, 2, 3, 4, 5, 6])).into();
717 /// let (evens, odds) = numbers.partition(q!(|&x| x % 2 == 0));
718 /// // evens: 2, 4, 6 tagged with true; odds: 1, 3, 5 tagged with false
719 /// evens.map(q!(|x| (x, true)))
720 /// .merge_unordered(odds.map(q!(|x| (x, false))))
721 /// # }, |mut stream| async move {
722 /// # let mut results = Vec::new();
723 /// # for _ in 0..6 {
724 /// # results.push(stream.next().await.unwrap());
725 /// # }
726 /// # results.sort();
727 /// # assert_eq!(results, vec![(1, false), (2, true), (3, false), (4, true), (5, false), (6, true)]);
728 /// # }));
729 /// # }
730 /// ```
731 #[expect(
732 clippy::type_complexity,
733 reason = "return type mirrors the input stream type"
734 )]
735 pub fn partition<F>(
736 self,
737 f: impl IntoQuotedMut<'a, F, L>,
738 ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
739 where
740 F: Fn(&T) -> bool + 'a,
741 {
742 let f: crate::compile::ir::DebugExpr = f.splice_fn1_borrow_ctx(&self.location).into();
743 let shared = SharedNode(Rc::new(RefCell::new(
744 self.ir_node.replace(HydroNode::Placeholder),
745 )));
746
747 let true_stream = Stream::new(
748 self.location.clone(),
749 HydroNode::Partition {
750 inner: SharedNode(shared.0.clone()),
751 f: f.clone(),
752 is_true: true,
753 metadata: self.location.new_node_metadata(Self::collection_kind()),
754 },
755 );
756
757 let false_stream = Stream::new(
758 self.location.clone(),
759 HydroNode::Partition {
760 inner: SharedNode(shared.0),
761 f,
762 is_true: false,
763 metadata: self.location.new_node_metadata(Self::collection_kind()),
764 },
765 );
766
767 (true_stream, false_stream)
768 }
769
770 /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
771 ///
772 /// # Example
773 /// ```rust
774 /// # #[cfg(feature = "deploy")] {
775 /// # use hydro_lang::prelude::*;
776 /// # use futures::StreamExt;
777 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
778 /// process
779 /// .source_iter(q!(vec!["1", "hello", "world", "2"]))
780 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
781 /// # }, |mut stream| async move {
782 /// // 1, 2
783 /// # for w in (1..3) {
784 /// # assert_eq!(stream.next().await.unwrap(), w);
785 /// # }
786 /// # }));
787 /// # }
788 /// ```
789 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
790 where
791 F: Fn(T) -> Option<U> + 'a,
792 {
793 let f = f.splice_fn1_ctx(&self.location).into();
794 Stream::new(
795 self.location.clone(),
796 HydroNode::FilterMap {
797 f,
798 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
799 metadata: self
800 .location
801 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
802 },
803 )
804 }
805
806 /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
807 /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
808 /// If `other` is an empty [`Optional`], no values will be produced.
809 ///
810 /// # Example
811 /// ```rust
812 /// # #[cfg(feature = "deploy")] {
813 /// # use hydro_lang::prelude::*;
814 /// # use futures::StreamExt;
815 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
816 /// let tick = process.tick();
817 /// let batch = process
818 /// .source_iter(q!(vec![1, 2, 3, 4]))
819 /// .batch(&tick, nondet!(/** test */));
820 /// let count = batch.clone().count(); // `count()` returns a singleton
821 /// batch.cross_singleton(count).all_ticks()
822 /// # }, |mut stream| async move {
823 /// // (1, 4), (2, 4), (3, 4), (4, 4)
824 /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
825 /// # assert_eq!(stream.next().await.unwrap(), w);
826 /// # }
827 /// # }));
828 /// # }
829 /// ```
830 pub fn cross_singleton<O2>(
831 self,
832 other: impl Into<Optional<O2, L, Bounded>>,
833 ) -> Stream<(T, O2), L, B, O, R>
834 where
835 O2: Clone,
836 {
837 let other: Optional<O2, L, Bounded> = other.into();
838 check_matching_location(&self.location, &other.location);
839
840 Stream::new(
841 self.location.clone(),
842 HydroNode::CrossSingleton {
843 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
844 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
845 metadata: self
846 .location
847 .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
848 },
849 )
850 }
851
852 /// Passes this stream through if the boolean signal is `true`, otherwise the output is empty.
853 ///
854 /// # Example
855 /// ```rust
856 /// # #[cfg(feature = "deploy")] {
857 /// # use hydro_lang::prelude::*;
858 /// # use futures::StreamExt;
859 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
860 /// let tick = process.tick();
861 /// // ticks are lazy by default, forces the second tick to run
862 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
863 ///
864 /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
865 /// let batch_first_tick = process
866 /// .source_iter(q!(vec![1, 2, 3, 4]))
867 /// .batch(&tick, nondet!(/** test */));
868 /// let batch_second_tick = process
869 /// .source_iter(q!(vec![5, 6, 7, 8]))
870 /// .batch(&tick, nondet!(/** test */))
871 /// .defer_tick();
872 /// batch_first_tick.chain(batch_second_tick)
873 /// .filter_if(signal)
874 /// .all_ticks()
875 /// # }, |mut stream| async move {
876 /// // [1, 2, 3, 4]
877 /// # for w in vec![1, 2, 3, 4] {
878 /// # assert_eq!(stream.next().await.unwrap(), w);
879 /// # }
880 /// # }));
881 /// # }
882 /// ```
883 pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
884 self.cross_singleton(signal.filter(q!(|b| *b)))
885 .map(q!(|(d, _)| d))
886 }
887
888 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
889 ///
890 /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
891 /// leader of a cluster.
892 ///
893 /// # Example
894 /// ```rust
895 /// # #[cfg(feature = "deploy")] {
896 /// # use hydro_lang::prelude::*;
897 /// # use futures::StreamExt;
898 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
899 /// let tick = process.tick();
900 /// // ticks are lazy by default, forces the second tick to run
901 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
902 ///
903 /// let batch_first_tick = process
904 /// .source_iter(q!(vec![1, 2, 3, 4]))
905 /// .batch(&tick, nondet!(/** test */));
906 /// let batch_second_tick = process
907 /// .source_iter(q!(vec![5, 6, 7, 8]))
908 /// .batch(&tick, nondet!(/** test */))
909 /// .defer_tick(); // appears on the second tick
910 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
911 /// batch_first_tick.chain(batch_second_tick)
912 /// .filter_if_some(some_on_first_tick)
913 /// .all_ticks()
914 /// # }, |mut stream| async move {
915 /// // [1, 2, 3, 4]
916 /// # for w in vec![1, 2, 3, 4] {
917 /// # assert_eq!(stream.next().await.unwrap(), w);
918 /// # }
919 /// # }));
920 /// # }
921 /// ```
922 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
923 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
924 self.filter_if(signal.is_some())
925 }
926
927 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
928 ///
929 /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
930 /// some local state.
931 ///
932 /// # Example
933 /// ```rust
934 /// # #[cfg(feature = "deploy")] {
935 /// # use hydro_lang::prelude::*;
936 /// # use futures::StreamExt;
937 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
938 /// let tick = process.tick();
939 /// // ticks are lazy by default, forces the second tick to run
940 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
941 ///
942 /// let batch_first_tick = process
943 /// .source_iter(q!(vec![1, 2, 3, 4]))
944 /// .batch(&tick, nondet!(/** test */));
945 /// let batch_second_tick = process
946 /// .source_iter(q!(vec![5, 6, 7, 8]))
947 /// .batch(&tick, nondet!(/** test */))
948 /// .defer_tick(); // appears on the second tick
949 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
950 /// batch_first_tick.chain(batch_second_tick)
951 /// .filter_if_none(some_on_first_tick)
952 /// .all_ticks()
953 /// # }, |mut stream| async move {
954 /// // [5, 6, 7, 8]
955 /// # for w in vec![5, 6, 7, 8] {
956 /// # assert_eq!(stream.next().await.unwrap(), w);
957 /// # }
958 /// # }));
959 /// # }
960 /// ```
961 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
962 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
963 self.filter_if(other.is_none())
964 }
965
966 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
967 /// tupled pairs in a non-deterministic order.
968 ///
969 /// # Example
970 /// ```rust
971 /// # #[cfg(feature = "deploy")] {
972 /// # use hydro_lang::prelude::*;
973 /// # use std::collections::HashSet;
974 /// # use futures::StreamExt;
975 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
976 /// let tick = process.tick();
977 /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
978 /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
979 /// stream1.cross_product(stream2)
980 /// # }, |mut stream| async move {
981 /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
982 /// # stream.map(|i| assert!(expected.contains(&i)));
983 /// # }));
984 /// # }
985 /// ```
986 pub fn cross_product<T2, O2: Ordering>(
987 self,
988 other: Stream<T2, L, B, O2, R>,
989 ) -> Stream<(T, T2), L, B, NoOrder, R>
990 where
991 T: Clone,
992 T2: Clone,
993 {
994 check_matching_location(&self.location, &other.location);
995
996 Stream::new(
997 self.location.clone(),
998 HydroNode::CrossProduct {
999 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1000 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1001 metadata: self
1002 .location
1003 .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
1004 },
1005 )
1006 }
1007
1008 /// Takes one stream as input and filters out any duplicate occurrences. The output
1009 /// contains all unique values from the input.
1010 ///
1011 /// # Example
1012 /// ```rust
1013 /// # #[cfg(feature = "deploy")] {
1014 /// # use hydro_lang::prelude::*;
1015 /// # use futures::StreamExt;
1016 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1017 /// let tick = process.tick();
1018 /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
1019 /// # }, |mut stream| async move {
1020 /// # for w in vec![1, 2, 3, 4] {
1021 /// # assert_eq!(stream.next().await.unwrap(), w);
1022 /// # }
1023 /// # }));
1024 /// # }
1025 /// ```
1026 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
1027 where
1028 T: Eq + Hash,
1029 {
1030 Stream::new(
1031 self.location.clone(),
1032 HydroNode::Unique {
1033 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1034 metadata: self
1035 .location
1036 .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1037 },
1038 )
1039 }
1040
1041 /// Outputs everything in this stream that is *not* contained in the `other` stream.
1042 ///
1043 /// The `other` stream must be [`Bounded`], since this function will wait until
1044 /// all its elements are available before producing any output.
1045 /// # Example
1046 /// ```rust
1047 /// # #[cfg(feature = "deploy")] {
1048 /// # use hydro_lang::prelude::*;
1049 /// # use futures::StreamExt;
1050 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1051 /// let tick = process.tick();
1052 /// let stream = process
1053 /// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
1054 /// .batch(&tick, nondet!(/** test */));
1055 /// let batch = process
1056 /// .source_iter(q!(vec![1, 2]))
1057 /// .batch(&tick, nondet!(/** test */));
1058 /// stream.filter_not_in(batch).all_ticks()
1059 /// # }, |mut stream| async move {
1060 /// # for w in vec![3, 4] {
1061 /// # assert_eq!(stream.next().await.unwrap(), w);
1062 /// # }
1063 /// # }));
1064 /// # }
1065 /// ```
1066 pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1067 where
1068 T: Eq + Hash,
1069 B2: IsBounded,
1070 {
1071 check_matching_location(&self.location, &other.location);
1072
1073 Stream::new(
1074 self.location.clone(),
1075 HydroNode::Difference {
1076 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1077 neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1078 metadata: self
1079 .location
1080 .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1081 },
1082 )
1083 }
1084
1085 /// An operator which allows you to "inspect" each element of a stream without
1086 /// modifying it. The closure `f` is called on a reference to each item. This is
1087 /// mainly useful for debugging, and should not be used to generate side-effects.
1088 ///
1089 /// # Example
1090 /// ```rust
1091 /// # #[cfg(feature = "deploy")] {
1092 /// # use hydro_lang::prelude::*;
1093 /// # use futures::StreamExt;
1094 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1095 /// let nums = process.source_iter(q!(vec![1, 2]));
1096 /// // prints "1 * 10 = 10" and "2 * 10 = 20"
1097 /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
1098 /// # }, |mut stream| async move {
1099 /// # for w in vec![1, 2] {
1100 /// # assert_eq!(stream.next().await.unwrap(), w);
1101 /// # }
1102 /// # }));
1103 /// # }
1104 /// ```
1105 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1106 where
1107 F: Fn(&T) + 'a,
1108 {
1109 let f = f.splice_fn1_borrow_ctx(&self.location).into();
1110
1111 Stream::new(
1112 self.location.clone(),
1113 HydroNode::Inspect {
1114 f,
1115 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1116 metadata: self.location.new_node_metadata(Self::collection_kind()),
1117 },
1118 )
1119 }
1120
1121 /// Executes the provided closure for every element in this stream.
1122 ///
1123 /// Because the closure may have side effects, the stream must have deterministic order
1124 /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
1125 /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
1126 /// [`Stream::assume_retries`] with an explanation for why this is the case.
1127 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
1128 where
1129 O: IsOrdered,
1130 R: IsExactlyOnce,
1131 {
1132 let f = f.splice_fn1_ctx(&self.location).into();
1133 self.location
1134 .flow_state()
1135 .borrow_mut()
1136 .push_root(HydroRoot::ForEach {
1137 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1138 f,
1139 op_metadata: HydroIrOpMetadata::new(),
1140 });
1141 }
1142
1143 /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
1144 /// TCP socket to some other server. You should _not_ use this API for interacting with
1145 /// external clients, instead see [`Location::bidi_external_many_bytes`] and
1146 /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
1147 /// interaction with asynchronous sinks.
1148 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1149 where
1150 O: IsOrdered,
1151 R: IsExactlyOnce,
1152 S: 'a + futures::Sink<T> + Unpin,
1153 {
1154 self.location
1155 .flow_state()
1156 .borrow_mut()
1157 .push_root(HydroRoot::DestSink {
1158 sink: sink.splice_typed_ctx(&self.location).into(),
1159 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1160 op_metadata: HydroIrOpMetadata::new(),
1161 });
1162 }
1163
1164 /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1165 ///
1166 /// # Example
1167 /// ```rust
1168 /// # #[cfg(feature = "deploy")] {
1169 /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1170 /// # use futures::StreamExt;
1171 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1172 /// let tick = process.tick();
1173 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1174 /// numbers.enumerate()
1175 /// # }, |mut stream| async move {
1176 /// // (0, 1), (1, 2), (2, 3), (3, 4)
1177 /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1178 /// # assert_eq!(stream.next().await.unwrap(), w);
1179 /// # }
1180 /// # }));
1181 /// # }
1182 /// ```
1183 pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1184 where
1185 O: IsOrdered,
1186 R: IsExactlyOnce,
1187 {
1188 Stream::new(
1189 self.location.clone(),
1190 HydroNode::Enumerate {
1191 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1192 metadata: self.location.new_node_metadata(Stream::<
1193 (usize, T),
1194 L,
1195 B,
1196 TotalOrder,
1197 ExactlyOnce,
1198 >::collection_kind()),
1199 },
1200 )
1201 }
1202
1203 /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1204 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1205 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1206 ///
1207 /// Depending on the input stream guarantees, the closure may need to be commutative
1208 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1209 ///
1210 /// # Example
1211 /// ```rust
1212 /// # #[cfg(feature = "deploy")] {
1213 /// # use hydro_lang::prelude::*;
1214 /// # use futures::StreamExt;
1215 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1216 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1217 /// words
1218 /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1219 /// .into_stream()
1220 /// # }, |mut stream| async move {
1221 /// // "HELLOWORLD"
1222 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1223 /// # }));
1224 /// # }
1225 /// ```
1226 pub fn fold<A, I, F, C, Idemp, M, B2: SingletonBound>(
1227 self,
1228 init: impl IntoQuotedMut<'a, I, L>,
1229 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1230 ) -> Singleton<A, L, B2>
1231 where
1232 I: Fn() -> A + 'a,
1233 F: Fn(&mut A, T),
1234 C: ValidCommutativityFor<O>,
1235 Idemp: ValidIdempotenceFor<R>,
1236 B: ApplyMonotoneStream<M, B2>,
1237 {
1238 let init = init.splice_fn0_ctx(&self.location).into();
1239 let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1240 proof.register_proof(&comb);
1241
1242 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1243 let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1244
1245 let core = HydroNode::Fold {
1246 init,
1247 acc: comb.into(),
1248 input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1249 metadata: ordered_etc
1250 .location
1251 .new_node_metadata(Singleton::<A, L, B2>::collection_kind()),
1252 };
1253
1254 Singleton::new(ordered_etc.location.clone(), core)
1255 }
1256
1257 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1258 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1259 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1260 /// reference, so that it can be modified in place.
1261 ///
1262 /// Depending on the input stream guarantees, the closure may need to be commutative
1263 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1264 ///
1265 /// # Example
1266 /// ```rust
1267 /// # #[cfg(feature = "deploy")] {
1268 /// # use hydro_lang::prelude::*;
1269 /// # use futures::StreamExt;
1270 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1271 /// let bools = process.source_iter(q!(vec![false, true, false]));
1272 /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1273 /// # }, |mut stream| async move {
1274 /// // true
1275 /// # assert_eq!(stream.next().await.unwrap(), true);
1276 /// # }));
1277 /// # }
1278 /// ```
1279 pub fn reduce<F, C, Idemp>(
1280 self,
1281 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1282 ) -> Optional<T, L, B>
1283 where
1284 F: Fn(&mut T, T) + 'a,
1285 C: ValidCommutativityFor<O>,
1286 Idemp: ValidIdempotenceFor<R>,
1287 {
1288 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1289 proof.register_proof(&f);
1290
1291 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1292 let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1293
1294 let core = HydroNode::Reduce {
1295 f: f.into(),
1296 input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1297 metadata: ordered_etc
1298 .location
1299 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1300 };
1301
1302 Optional::new(ordered_etc.location.clone(), core)
1303 }
1304
1305 /// Computes the maximum element in the stream as an [`Optional`], which
1306 /// will be empty until the first element in the input arrives.
1307 ///
1308 /// # Example
1309 /// ```rust
1310 /// # #[cfg(feature = "deploy")] {
1311 /// # use hydro_lang::prelude::*;
1312 /// # use futures::StreamExt;
1313 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1314 /// let tick = process.tick();
1315 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1316 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1317 /// batch.max().all_ticks()
1318 /// # }, |mut stream| async move {
1319 /// // 4
1320 /// # assert_eq!(stream.next().await.unwrap(), 4);
1321 /// # }));
1322 /// # }
1323 /// ```
1324 pub fn max(self) -> Optional<T, L, B>
1325 where
1326 T: Ord,
1327 {
1328 self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1329 .assume_ordering_trusted_bounded::<TotalOrder>(
1330 nondet!(/** max is commutative, but order affects intermediates */),
1331 )
1332 .reduce(q!(|curr, new| {
1333 if new > *curr {
1334 *curr = new;
1335 }
1336 }))
1337 }
1338
1339 /// Computes the minimum element in the stream as an [`Optional`], which
1340 /// will be empty until the first element in the input arrives.
1341 ///
1342 /// # Example
1343 /// ```rust
1344 /// # #[cfg(feature = "deploy")] {
1345 /// # use hydro_lang::prelude::*;
1346 /// # use futures::StreamExt;
1347 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1348 /// let tick = process.tick();
1349 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1350 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1351 /// batch.min().all_ticks()
1352 /// # }, |mut stream| async move {
1353 /// // 1
1354 /// # assert_eq!(stream.next().await.unwrap(), 1);
1355 /// # }));
1356 /// # }
1357 /// ```
1358 pub fn min(self) -> Optional<T, L, B>
1359 where
1360 T: Ord,
1361 {
1362 self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1363 .assume_ordering_trusted_bounded::<TotalOrder>(
1364 nondet!(/** max is commutative, but order affects intermediates */),
1365 )
1366 .reduce(q!(|curr, new| {
1367 if new < *curr {
1368 *curr = new;
1369 }
1370 }))
1371 }
1372
1373 /// Computes the first element in the stream as an [`Optional`], which
1374 /// will be empty until the first element in the input arrives.
1375 ///
1376 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1377 /// re-ordering of elements may cause the first element to change.
1378 ///
1379 /// # Example
1380 /// ```rust
1381 /// # #[cfg(feature = "deploy")] {
1382 /// # use hydro_lang::prelude::*;
1383 /// # use futures::StreamExt;
1384 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1385 /// let tick = process.tick();
1386 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1387 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1388 /// batch.first().all_ticks()
1389 /// # }, |mut stream| async move {
1390 /// // 1
1391 /// # assert_eq!(stream.next().await.unwrap(), 1);
1392 /// # }));
1393 /// # }
1394 /// ```
1395 pub fn first(self) -> Optional<T, L, B>
1396 where
1397 O: IsOrdered,
1398 {
1399 self.make_totally_ordered()
1400 .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1401 .reduce(q!(|_, _| {}))
1402 }
1403
1404 /// Computes the last element in the stream as an [`Optional`], which
1405 /// will be empty until an element in the input arrives.
1406 ///
1407 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1408 /// re-ordering of elements may cause the last element to change.
1409 ///
1410 /// # Example
1411 /// ```rust
1412 /// # #[cfg(feature = "deploy")] {
1413 /// # use hydro_lang::prelude::*;
1414 /// # use futures::StreamExt;
1415 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1416 /// let tick = process.tick();
1417 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1418 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1419 /// batch.last().all_ticks()
1420 /// # }, |mut stream| async move {
1421 /// // 4
1422 /// # assert_eq!(stream.next().await.unwrap(), 4);
1423 /// # }));
1424 /// # }
1425 /// ```
1426 pub fn last(self) -> Optional<T, L, B>
1427 where
1428 O: IsOrdered,
1429 {
1430 self.make_totally_ordered()
1431 .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1432 .reduce(q!(|curr, new| *curr = new))
1433 }
1434
1435 /// Returns a stream containing at most the first `n` elements of the input stream,
1436 /// preserving the original order. Similar to `LIMIT` in SQL.
1437 ///
1438 /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1439 /// retries, since the result depends on the order and cardinality of elements.
1440 ///
1441 /// # Example
1442 /// ```rust
1443 /// # #[cfg(feature = "deploy")] {
1444 /// # use hydro_lang::prelude::*;
1445 /// # use futures::StreamExt;
1446 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1447 /// let numbers = process.source_iter(q!(vec![10, 20, 30, 40, 50]));
1448 /// numbers.limit(q!(3))
1449 /// # }, |mut stream| async move {
1450 /// // 10, 20, 30
1451 /// # for w in vec![10, 20, 30] {
1452 /// # assert_eq!(stream.next().await.unwrap(), w);
1453 /// # }
1454 /// # }));
1455 /// # }
1456 /// ```
1457 pub fn limit(
1458 self,
1459 n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1460 ) -> Stream<T, L, B, TotalOrder, ExactlyOnce>
1461 where
1462 O: IsOrdered,
1463 R: IsExactlyOnce,
1464 {
1465 self.generator(
1466 q!(|| 0usize),
1467 q!(move |count, item| {
1468 if *count == n {
1469 Generate::Break
1470 } else {
1471 *count += 1;
1472 if *count == n {
1473 Generate::Return(item)
1474 } else {
1475 Generate::Yield(item)
1476 }
1477 }
1478 }),
1479 )
1480 }
1481
1482 /// Collects all the elements of this stream into a single [`Vec`] element.
1483 ///
1484 /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1485 /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1486 /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1487 /// the vector at an arbitrary point in time.
1488 ///
1489 /// # Example
1490 /// ```rust
1491 /// # #[cfg(feature = "deploy")] {
1492 /// # use hydro_lang::prelude::*;
1493 /// # use futures::StreamExt;
1494 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1495 /// let tick = process.tick();
1496 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1497 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1498 /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1499 /// # }, |mut stream| async move {
1500 /// // [ vec![1, 2, 3, 4] ]
1501 /// # for w in vec![vec![1, 2, 3, 4]] {
1502 /// # assert_eq!(stream.next().await.unwrap(), w);
1503 /// # }
1504 /// # }));
1505 /// # }
1506 /// ```
1507 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1508 where
1509 O: IsOrdered,
1510 R: IsExactlyOnce,
1511 {
1512 self.make_totally_ordered().make_exactly_once().fold(
1513 q!(|| vec![]),
1514 q!(|acc, v| {
1515 acc.push(v);
1516 }),
1517 )
1518 }
1519
1520 /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1521 /// and emitting each intermediate result.
1522 ///
1523 /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1524 /// containing all intermediate accumulated values. The scan operation can also terminate early
1525 /// by returning `None`.
1526 ///
1527 /// The function takes a mutable reference to the accumulator and the current element, and returns
1528 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1529 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1530 ///
1531 /// # Examples
1532 ///
1533 /// Basic usage - running sum:
1534 /// ```rust
1535 /// # #[cfg(feature = "deploy")] {
1536 /// # use hydro_lang::prelude::*;
1537 /// # use futures::StreamExt;
1538 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1539 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1540 /// q!(|| 0),
1541 /// q!(|acc, x| {
1542 /// *acc += x;
1543 /// Some(*acc)
1544 /// }),
1545 /// )
1546 /// # }, |mut stream| async move {
1547 /// // Output: 1, 3, 6, 10
1548 /// # for w in vec![1, 3, 6, 10] {
1549 /// # assert_eq!(stream.next().await.unwrap(), w);
1550 /// # }
1551 /// # }));
1552 /// # }
1553 /// ```
1554 ///
1555 /// Early termination example:
1556 /// ```rust
1557 /// # #[cfg(feature = "deploy")] {
1558 /// # use hydro_lang::prelude::*;
1559 /// # use futures::StreamExt;
1560 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1561 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1562 /// q!(|| 1),
1563 /// q!(|state, x| {
1564 /// *state = *state * x;
1565 /// if *state > 6 {
1566 /// None // Terminate the stream
1567 /// } else {
1568 /// Some(-*state)
1569 /// }
1570 /// }),
1571 /// )
1572 /// # }, |mut stream| async move {
1573 /// // Output: -1, -2, -6
1574 /// # for w in vec![-1, -2, -6] {
1575 /// # assert_eq!(stream.next().await.unwrap(), w);
1576 /// # }
1577 /// # }));
1578 /// # }
1579 /// ```
1580 pub fn scan<A, U, I, F>(
1581 self,
1582 init: impl IntoQuotedMut<'a, I, L>,
1583 f: impl IntoQuotedMut<'a, F, L>,
1584 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1585 where
1586 O: IsOrdered,
1587 R: IsExactlyOnce,
1588 I: Fn() -> A + 'a,
1589 F: Fn(&mut A, T) -> Option<U> + 'a,
1590 {
1591 let init = init.splice_fn0_ctx(&self.location).into();
1592 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1593
1594 Stream::new(
1595 self.location.clone(),
1596 HydroNode::Scan {
1597 init,
1598 acc: f,
1599 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1600 metadata: self.location.new_node_metadata(
1601 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1602 ),
1603 },
1604 )
1605 }
1606
1607 /// Iteratively processes the elements of the stream using a state machine that can yield
1608 /// elements as it processes its inputs. This is designed to mirror the unstable generator
1609 /// syntax in Rust, without requiring special syntax.
1610 ///
1611 /// Like [`Stream::scan`], this function takes in an initializer that emits the initial
1612 /// state. The second argument defines the processing logic, taking in a mutable reference
1613 /// to the state and the value to be processed. It emits a [`Generate`] value, whose
1614 /// variants define what is emitted and whether further inputs should be processed.
1615 ///
1616 /// # Example
1617 /// ```rust
1618 /// # #[cfg(feature = "deploy")] {
1619 /// # use hydro_lang::prelude::*;
1620 /// # use futures::StreamExt;
1621 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1622 /// process.source_iter(q!(vec![1, 3, 100, 10])).generator(
1623 /// q!(|| 0),
1624 /// q!(|acc, x| {
1625 /// *acc += x;
1626 /// if *acc > 100 {
1627 /// hydro_lang::live_collections::keyed_stream::Generate::Return("done!".to_owned())
1628 /// } else if *acc % 2 == 0 {
1629 /// hydro_lang::live_collections::keyed_stream::Generate::Yield("even".to_owned())
1630 /// } else {
1631 /// hydro_lang::live_collections::keyed_stream::Generate::Continue
1632 /// }
1633 /// }),
1634 /// )
1635 /// # }, |mut stream| async move {
1636 /// // Output: "even", "done!"
1637 /// # let mut results = Vec::new();
1638 /// # for _ in 0..2 {
1639 /// # results.push(stream.next().await.unwrap());
1640 /// # }
1641 /// # results.sort();
1642 /// # assert_eq!(results, vec!["done!".to_owned(), "even".to_owned()]);
1643 /// # }));
1644 /// # }
1645 /// ```
1646 pub fn generator<A, U, I, F>(
1647 self,
1648 init: impl IntoQuotedMut<'a, I, L> + Copy,
1649 f: impl IntoQuotedMut<'a, F, L> + Copy,
1650 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1651 where
1652 O: IsOrdered,
1653 R: IsExactlyOnce,
1654 I: Fn() -> A + 'a,
1655 F: Fn(&mut A, T) -> Generate<U> + 'a,
1656 {
1657 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1658 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1659
1660 let this = self.make_totally_ordered().make_exactly_once();
1661
1662 // State is Option<Option<A>>:
1663 // None = not yet initialized
1664 // Some(Some(a)) = active with state a
1665 // Some(None) = terminated
1666 let scan_init = q!(|| None)
1667 .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1668 .into();
1669 let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1670 if state.is_none() {
1671 *state = Some(Some(init()));
1672 }
1673 match state {
1674 Some(Some(state_value)) => match f(state_value, v) {
1675 Generate::Yield(out) => Some(Some(out)),
1676 Generate::Return(out) => {
1677 *state = Some(None);
1678 Some(Some(out))
1679 }
1680 // Unlike KeyedStream, we can terminate the scan directly on
1681 // Break/Return because there is only one state (no other keys
1682 // that still need processing).
1683 Generate::Break => None,
1684 Generate::Continue => Some(None),
1685 },
1686 // State is Some(None) after Return; terminate the scan.
1687 _ => None,
1688 }
1689 })
1690 .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1691 .into();
1692
1693 let scan_node = HydroNode::Scan {
1694 init: scan_init,
1695 acc: scan_f,
1696 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1697 metadata: this.location.new_node_metadata(Stream::<
1698 Option<U>,
1699 L,
1700 B,
1701 TotalOrder,
1702 ExactlyOnce,
1703 >::collection_kind()),
1704 };
1705
1706 let flatten_f = q!(|d| d)
1707 .splice_fn1_ctx::<Option<U>, _>(&this.location)
1708 .into();
1709 let flatten_node = HydroNode::FlatMap {
1710 f: flatten_f,
1711 input: Box::new(scan_node),
1712 metadata: this
1713 .location
1714 .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1715 };
1716
1717 Stream::new(this.location.clone(), flatten_node)
1718 }
1719
1720 /// Given a time interval, returns a stream corresponding to samples taken from the
1721 /// stream roughly at that interval. The output will have elements in the same order
1722 /// as the input, but with arbitrary elements skipped between samples. There is also
1723 /// no guarantee on the exact timing of the samples.
1724 ///
1725 /// # Non-Determinism
1726 /// The output stream is non-deterministic in which elements are sampled, since this
1727 /// is controlled by a clock.
1728 pub fn sample_every(
1729 self,
1730 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1731 nondet: NonDet,
1732 ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
1733 where
1734 L: NoTick + NoAtomic,
1735 {
1736 let samples = self.location.source_interval(interval, nondet);
1737
1738 let tick = self.location.tick();
1739 self.batch(&tick, nondet)
1740 .filter_if(samples.batch(&tick, nondet).first().is_some())
1741 .all_ticks()
1742 .weaken_retries()
1743 }
1744
1745 /// Given a timeout duration, returns an [`Optional`] which will have a value if the
1746 /// stream has not emitted a value since that duration.
1747 ///
1748 /// # Non-Determinism
1749 /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1750 /// samples take place, timeouts may be non-deterministically generated or missed,
1751 /// and the notification of the timeout may be delayed as well. There is also no
1752 /// guarantee on how long the [`Optional`] will have a value after the timeout is
1753 /// detected based on when the next sample is taken.
1754 pub fn timeout(
1755 self,
1756 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
1757 nondet: NonDet,
1758 ) -> Optional<(), L, Unbounded>
1759 where
1760 L: NoTick + NoAtomic,
1761 {
1762 let tick = self.location.tick();
1763
1764 let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
1765 q!(|| None),
1766 q!(
1767 |latest, _| {
1768 *latest = Some(Instant::now());
1769 },
1770 commutative = manual_proof!(/** TODO */)
1771 ),
1772 );
1773
1774 latest_received
1775 .snapshot(&tick, nondet)
1776 .filter_map(q!(move |latest_received| {
1777 if let Some(latest_received) = latest_received {
1778 if Instant::now().duration_since(latest_received) > duration {
1779 Some(())
1780 } else {
1781 None
1782 }
1783 } else {
1784 Some(())
1785 }
1786 }))
1787 .latest()
1788 }
1789
1790 /// Shifts this stream into an atomic context, which guarantees that any downstream logic
1791 /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
1792 ///
1793 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1794 /// processed before an acknowledgement is emitted.
1795 pub fn atomic(self) -> Stream<T, Atomic<L>, B, O, R> {
1796 let id = self.location.flow_state().borrow_mut().next_clock_id();
1797 let out_location = Atomic {
1798 tick: Tick {
1799 id,
1800 l: self.location.clone(),
1801 },
1802 };
1803 Stream::new(
1804 out_location.clone(),
1805 HydroNode::BeginAtomic {
1806 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1807 metadata: out_location
1808 .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
1809 },
1810 )
1811 }
1812
1813 /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1814 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1815 /// the order of the input. The output stream will execute in the [`Tick`] that was
1816 /// used to create the atomic section.
1817 ///
1818 /// # Non-Determinism
1819 /// The batch boundaries are non-deterministic and may change across executions.
1820 pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
1821 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1822 Stream::new(
1823 tick.clone(),
1824 HydroNode::Batch {
1825 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1826 metadata: tick
1827 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
1828 },
1829 )
1830 }
1831
1832 /// An operator which allows you to "name" a `HydroNode`.
1833 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1834 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
1835 {
1836 let mut node = self.ir_node.borrow_mut();
1837 let metadata = node.metadata_mut();
1838 metadata.tag = Some(name.to_owned());
1839 }
1840 self
1841 }
1842
1843 /// Explicitly "casts" the stream to a type with a different ordering
1844 /// guarantee. Useful in unsafe code where the ordering cannot be proven
1845 /// by the type-system.
1846 ///
1847 /// # Non-Determinism
1848 /// This function is used as an escape hatch, and any mistakes in the
1849 /// provided ordering guarantee will propagate into the guarantees
1850 /// for the rest of the program.
1851 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
1852 if O::ORDERING_KIND == O2::ORDERING_KIND {
1853 Stream::new(
1854 self.location.clone(),
1855 self.ir_node.replace(HydroNode::Placeholder),
1856 )
1857 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1858 // We can always weaken the ordering guarantee
1859 Stream::new(
1860 self.location.clone(),
1861 HydroNode::Cast {
1862 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1863 metadata: self
1864 .location
1865 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1866 },
1867 )
1868 } else {
1869 Stream::new(
1870 self.location.clone(),
1871 HydroNode::ObserveNonDet {
1872 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1873 trusted: false,
1874 metadata: self
1875 .location
1876 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1877 },
1878 )
1879 }
1880 }
1881
1882 // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
1883 // intermediate states will not be revealed
1884 fn assume_ordering_trusted_bounded<O2: Ordering>(
1885 self,
1886 nondet: NonDet,
1887 ) -> Stream<T, L, B, O2, R> {
1888 if B::BOUNDED {
1889 self.assume_ordering_trusted(nondet)
1890 } else {
1891 self.assume_ordering(nondet)
1892 }
1893 }
1894
1895 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1896 // is not observable
1897 pub(crate) fn assume_ordering_trusted<O2: Ordering>(
1898 self,
1899 _nondet: NonDet,
1900 ) -> Stream<T, L, B, O2, R> {
1901 if O::ORDERING_KIND == O2::ORDERING_KIND {
1902 Stream::new(
1903 self.location.clone(),
1904 self.ir_node.replace(HydroNode::Placeholder),
1905 )
1906 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1907 // We can always weaken the ordering guarantee
1908 Stream::new(
1909 self.location.clone(),
1910 HydroNode::Cast {
1911 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1912 metadata: self
1913 .location
1914 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1915 },
1916 )
1917 } else {
1918 Stream::new(
1919 self.location.clone(),
1920 HydroNode::ObserveNonDet {
1921 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1922 trusted: true,
1923 metadata: self
1924 .location
1925 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1926 },
1927 )
1928 }
1929 }
1930
1931 #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
1932 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
1933 /// which is always safe because that is the weakest possible guarantee.
1934 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
1935 self.weaken_ordering::<NoOrder>()
1936 }
1937
1938 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
1939 /// enforcing that `O2` is weaker than the input ordering guarantee.
1940 pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
1941 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
1942 self.assume_ordering::<O2>(nondet)
1943 }
1944
1945 /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
1946 /// implies that `O == TotalOrder`.
1947 pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
1948 where
1949 O: IsOrdered,
1950 {
1951 self.assume_ordering(nondet!(/** no-op */))
1952 }
1953
1954 /// Explicitly "casts" the stream to a type with a different retries
1955 /// guarantee. Useful in unsafe code where the lack of retries cannot
1956 /// be proven by the type-system.
1957 ///
1958 /// # Non-Determinism
1959 /// This function is used as an escape hatch, and any mistakes in the
1960 /// provided retries guarantee will propagate into the guarantees
1961 /// for the rest of the program.
1962 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1963 if R::RETRIES_KIND == R2::RETRIES_KIND {
1964 Stream::new(
1965 self.location.clone(),
1966 self.ir_node.replace(HydroNode::Placeholder),
1967 )
1968 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1969 // We can always weaken the retries guarantee
1970 Stream::new(
1971 self.location.clone(),
1972 HydroNode::Cast {
1973 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1974 metadata: self
1975 .location
1976 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1977 },
1978 )
1979 } else {
1980 Stream::new(
1981 self.location.clone(),
1982 HydroNode::ObserveNonDet {
1983 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1984 trusted: false,
1985 metadata: self
1986 .location
1987 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1988 },
1989 )
1990 }
1991 }
1992
1993 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1994 // is not observable
1995 fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1996 if R::RETRIES_KIND == R2::RETRIES_KIND {
1997 Stream::new(
1998 self.location.clone(),
1999 self.ir_node.replace(HydroNode::Placeholder),
2000 )
2001 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2002 // We can always weaken the retries guarantee
2003 Stream::new(
2004 self.location.clone(),
2005 HydroNode::Cast {
2006 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2007 metadata: self
2008 .location
2009 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2010 },
2011 )
2012 } else {
2013 Stream::new(
2014 self.location.clone(),
2015 HydroNode::ObserveNonDet {
2016 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2017 trusted: true,
2018 metadata: self
2019 .location
2020 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2021 },
2022 )
2023 }
2024 }
2025
2026 #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
2027 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
2028 /// which is always safe because that is the weakest possible guarantee.
2029 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
2030 self.weaken_retries::<AtLeastOnce>()
2031 }
2032
2033 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
2034 /// enforcing that `R2` is weaker than the input retries guarantee.
2035 pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
2036 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
2037 self.assume_retries::<R2>(nondet)
2038 }
2039
2040 /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
2041 /// implies that `R == ExactlyOnce`.
2042 pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
2043 where
2044 R: IsExactlyOnce,
2045 {
2046 self.assume_retries(nondet!(/** no-op */))
2047 }
2048
2049 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
2050 /// implies that `B == Bounded`.
2051 pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
2052 where
2053 B: IsBounded,
2054 {
2055 Stream::new(
2056 self.location.clone(),
2057 self.ir_node.replace(HydroNode::Placeholder),
2058 )
2059 }
2060}
2061
2062impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
2063where
2064 L: Location<'a>,
2065{
2066 /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
2067 ///
2068 /// # Example
2069 /// ```rust
2070 /// # #[cfg(feature = "deploy")] {
2071 /// # use hydro_lang::prelude::*;
2072 /// # use futures::StreamExt;
2073 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2074 /// process.source_iter(q!(&[1, 2, 3])).cloned()
2075 /// # }, |mut stream| async move {
2076 /// // 1, 2, 3
2077 /// # for w in vec![1, 2, 3] {
2078 /// # assert_eq!(stream.next().await.unwrap(), w);
2079 /// # }
2080 /// # }));
2081 /// # }
2082 /// ```
2083 pub fn cloned(self) -> Stream<T, L, B, O, R>
2084 where
2085 T: Clone,
2086 {
2087 self.map(q!(|d| d.clone()))
2088 }
2089}
2090
2091impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2092where
2093 L: Location<'a>,
2094{
2095 /// Computes the number of elements in the stream as a [`Singleton`].
2096 ///
2097 /// # Example
2098 /// ```rust
2099 /// # #[cfg(feature = "deploy")] {
2100 /// # use hydro_lang::prelude::*;
2101 /// # use futures::StreamExt;
2102 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2103 /// let tick = process.tick();
2104 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2105 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2106 /// batch.count().all_ticks()
2107 /// # }, |mut stream| async move {
2108 /// // 4
2109 /// # assert_eq!(stream.next().await.unwrap(), 4);
2110 /// # }));
2111 /// # }
2112 /// ```
2113 pub fn count(self) -> Singleton<usize, L, B::StreamToMonotone> {
2114 self.assume_ordering_trusted::<TotalOrder>(nondet!(
2115 /// Order does not affect eventual count, and also does not affect intermediate states.
2116 ))
2117 .fold(
2118 q!(|| 0usize),
2119 q!(
2120 |count, _| *count += 1,
2121 monotone = manual_proof!(/** += 1 is monotone */)
2122 ),
2123 )
2124 }
2125}
2126
2127impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2128 /// Produces a new stream that merges the elements of the two input streams.
2129 /// The result has [`NoOrder`] because the order of merging is not guaranteed.
2130 ///
2131 /// Currently, both input streams must be [`Unbounded`]. When the streams are
2132 /// [`Bounded`], you can use [`Stream::chain`] instead.
2133 ///
2134 /// # Example
2135 /// ```rust
2136 /// # #[cfg(feature = "deploy")] {
2137 /// # use hydro_lang::prelude::*;
2138 /// # use futures::StreamExt;
2139 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2140 /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
2141 /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
2142 /// numbers.clone().map(q!(|x| x + 1)).merge_unordered(numbers)
2143 /// # }, |mut stream| async move {
2144 /// // 2, 3, 4, 5, and 1, 2, 3, 4 merged in unknown order
2145 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2146 /// # assert_eq!(stream.next().await.unwrap(), w);
2147 /// # }
2148 /// # }));
2149 /// # }
2150 /// ```
2151 pub fn merge_unordered<O2: Ordering, R2: Retries>(
2152 self,
2153 other: Stream<T, L, Unbounded, O2, R2>,
2154 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2155 where
2156 R: MinRetries<R2>,
2157 {
2158 Stream::new(
2159 self.location.clone(),
2160 HydroNode::Chain {
2161 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2162 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2163 metadata: self.location.new_node_metadata(Stream::<
2164 T,
2165 L,
2166 Unbounded,
2167 NoOrder,
2168 <R as MinRetries<R2>>::Min,
2169 >::collection_kind()),
2170 },
2171 )
2172 }
2173
2174 /// Deprecated: use [`Stream::merge_unordered`] instead.
2175 #[deprecated(note = "use `merge_unordered` instead")]
2176 pub fn interleave<O2: Ordering, R2: Retries>(
2177 self,
2178 other: Stream<T, L, Unbounded, O2, R2>,
2179 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2180 where
2181 R: MinRetries<R2>,
2182 {
2183 self.merge_unordered(other)
2184 }
2185}
2186
2187impl<'a, T, L: Location<'a> + NoTick, R: Retries> Stream<T, L, Unbounded, TotalOrder, R> {
2188 /// Produces a new stream that combines the elements of the two input streams,
2189 /// preserving the relative order of elements within each input.
2190 ///
2191 /// Currently, both input streams must be [`Unbounded`]. When the streams are
2192 /// [`Bounded`], you can use [`Stream::chain`] instead.
2193 ///
2194 /// # Non-Determinism
2195 /// The order in which elements *across* the two streams will be interleaved is
2196 /// non-deterministic, so the order of elements will vary across runs. If the output order
2197 /// is irrelevant, use [`Stream::merge_unordered`] instead, which is deterministic but emits an
2198 /// unordered stream.
2199 ///
2200 /// # Example
2201 /// ```rust
2202 /// # #[cfg(feature = "deploy")] {
2203 /// # use hydro_lang::prelude::*;
2204 /// # use futures::StreamExt;
2205 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2206 /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
2207 /// # process.source_iter(q!(vec![1, 3])).into();
2208 /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
2209 /// # }, |mut stream| async move {
2210 /// // 1, 3 and 2, 4 in some order, preserving the original local order
2211 /// # for w in vec![1, 3, 2, 4] {
2212 /// # assert_eq!(stream.next().await.unwrap(), w);
2213 /// # }
2214 /// # }));
2215 /// # }
2216 /// ```
2217 pub fn merge_ordered<R2: Retries>(
2218 self,
2219 other: Stream<T, L, Unbounded, TotalOrder, R2>,
2220 _nondet: NonDet,
2221 ) -> Stream<T, L, Unbounded, TotalOrder, <R as MinRetries<R2>>::Min>
2222 where
2223 R: MinRetries<R2>,
2224 {
2225 Stream::new(
2226 self.location.clone(),
2227 HydroNode::Chain {
2228 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2229 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2230 metadata: self.location.new_node_metadata(Stream::<
2231 T,
2232 L,
2233 Unbounded,
2234 TotalOrder,
2235 <R as MinRetries<R2>>::Min,
2236 >::collection_kind()),
2237 },
2238 )
2239 }
2240}
2241
2242impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2243where
2244 L: Location<'a>,
2245{
2246 /// Produces a new stream that emits the input elements in sorted order.
2247 ///
2248 /// The input stream can have any ordering guarantee, but the output stream
2249 /// will have a [`TotalOrder`] guarantee. This operator will block until all
2250 /// elements in the input stream are available, so it requires the input stream
2251 /// to be [`Bounded`].
2252 ///
2253 /// # Example
2254 /// ```rust
2255 /// # #[cfg(feature = "deploy")] {
2256 /// # use hydro_lang::prelude::*;
2257 /// # use futures::StreamExt;
2258 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2259 /// let tick = process.tick();
2260 /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
2261 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2262 /// batch.sort().all_ticks()
2263 /// # }, |mut stream| async move {
2264 /// // 1, 2, 3, 4
2265 /// # for w in (1..5) {
2266 /// # assert_eq!(stream.next().await.unwrap(), w);
2267 /// # }
2268 /// # }));
2269 /// # }
2270 /// ```
2271 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2272 where
2273 B: IsBounded,
2274 T: Ord,
2275 {
2276 let this = self.make_bounded();
2277 Stream::new(
2278 this.location.clone(),
2279 HydroNode::Sort {
2280 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2281 metadata: this
2282 .location
2283 .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2284 },
2285 )
2286 }
2287
2288 /// Produces a new stream that first emits the elements of the `self` stream,
2289 /// and then emits the elements of the `other` stream. The output stream has
2290 /// a [`TotalOrder`] guarantee if and only if both input streams have a
2291 /// [`TotalOrder`] guarantee.
2292 ///
2293 /// Currently, both input streams must be [`Bounded`]. This operator will block
2294 /// on the first stream until all its elements are available. In a future version,
2295 /// we will relax the requirement on the `other` stream.
2296 ///
2297 /// # Example
2298 /// ```rust
2299 /// # #[cfg(feature = "deploy")] {
2300 /// # use hydro_lang::prelude::*;
2301 /// # use futures::StreamExt;
2302 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2303 /// let tick = process.tick();
2304 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2305 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2306 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2307 /// # }, |mut stream| async move {
2308 /// // 2, 3, 4, 5, 1, 2, 3, 4
2309 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2310 /// # assert_eq!(stream.next().await.unwrap(), w);
2311 /// # }
2312 /// # }));
2313 /// # }
2314 /// ```
2315 pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2316 self,
2317 other: Stream<T, L, B2, O2, R2>,
2318 ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2319 where
2320 B: IsBounded,
2321 O: MinOrder<O2>,
2322 R: MinRetries<R2>,
2323 {
2324 check_matching_location(&self.location, &other.location);
2325
2326 Stream::new(
2327 self.location.clone(),
2328 HydroNode::Chain {
2329 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2330 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2331 metadata: self.location.new_node_metadata(Stream::<
2332 T,
2333 L,
2334 B2,
2335 <O as MinOrder<O2>>::Min,
2336 <R as MinRetries<R2>>::Min,
2337 >::collection_kind()),
2338 },
2339 )
2340 }
2341
2342 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
2343 /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
2344 /// because this is compiled into a nested loop.
2345 pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
2346 self,
2347 other: Stream<T2, L, Bounded, O2, R>,
2348 ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
2349 where
2350 B: IsBounded,
2351 T: Clone,
2352 T2: Clone,
2353 {
2354 let this = self.make_bounded();
2355 check_matching_location(&this.location, &other.location);
2356
2357 Stream::new(
2358 this.location.clone(),
2359 HydroNode::CrossProduct {
2360 left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2361 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2362 metadata: this.location.new_node_metadata(Stream::<
2363 (T, T2),
2364 L,
2365 Bounded,
2366 <O2 as MinOrder<O>>::Min,
2367 R,
2368 >::collection_kind()),
2369 },
2370 )
2371 }
2372
2373 /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2374 /// `self` used as the values for *each* key.
2375 ///
2376 /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2377 /// values. For example, it can be used to send the same set of elements to several cluster
2378 /// members, if the membership information is available as a [`KeyedSingleton`].
2379 ///
2380 /// # Example
2381 /// ```rust
2382 /// # #[cfg(feature = "deploy")] {
2383 /// # use hydro_lang::prelude::*;
2384 /// # use futures::StreamExt;
2385 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2386 /// # let tick = process.tick();
2387 /// let keyed_singleton = // { 1: (), 2: () }
2388 /// # process
2389 /// # .source_iter(q!(vec![(1, ()), (2, ())]))
2390 /// # .into_keyed()
2391 /// # .batch(&tick, nondet!(/** test */))
2392 /// # .first();
2393 /// let stream = // [ "a", "b" ]
2394 /// # process
2395 /// # .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2396 /// # .batch(&tick, nondet!(/** test */));
2397 /// stream.repeat_with_keys(keyed_singleton)
2398 /// # .entries().all_ticks()
2399 /// # }, |mut stream| async move {
2400 /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2401 /// # let mut results = Vec::new();
2402 /// # for _ in 0..4 {
2403 /// # results.push(stream.next().await.unwrap());
2404 /// # }
2405 /// # results.sort();
2406 /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2407 /// # }));
2408 /// # }
2409 /// ```
2410 pub fn repeat_with_keys<K, V2>(
2411 self,
2412 keys: KeyedSingleton<K, V2, L, Bounded>,
2413 ) -> KeyedStream<K, T, L, Bounded, O, R>
2414 where
2415 B: IsBounded,
2416 K: Clone,
2417 T: Clone,
2418 {
2419 keys.keys()
2420 .weaken_retries()
2421 .assume_ordering_trusted::<TotalOrder>(
2422 nondet!(/** keyed stream does not depend on ordering of keys */),
2423 )
2424 .cross_product_nested_loop(self.make_bounded())
2425 .into_keyed()
2426 }
2427
2428 /// Consumes a stream of `Future<T>`, resolving each future while blocking subgraph
2429 /// execution until all results are available. The output order is based on when futures
2430 /// complete, and may be different than the input order.
2431 ///
2432 /// Unlike [`Stream::resolve_futures`], which allows the subgraph to continue executing
2433 /// while futures are pending, this variant blocks until the futures resolve.
2434 ///
2435 /// # Example
2436 /// ```rust
2437 /// # #[cfg(feature = "deploy")] {
2438 /// # use std::collections::HashSet;
2439 /// # use futures::StreamExt;
2440 /// # use hydro_lang::prelude::*;
2441 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2442 /// process
2443 /// .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2444 /// .map(q!(|x| async move {
2445 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2446 /// x
2447 /// }))
2448 /// .resolve_futures_blocking()
2449 /// # },
2450 /// # |mut stream| async move {
2451 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2452 /// # let mut output = HashSet::new();
2453 /// # for _ in 1..10 {
2454 /// # output.insert(stream.next().await.unwrap());
2455 /// # }
2456 /// # assert_eq!(
2457 /// # output,
2458 /// # HashSet::<i32>::from_iter(1..10)
2459 /// # );
2460 /// # },
2461 /// # ));
2462 /// # }
2463 /// ```
2464 pub fn resolve_futures_blocking(self) -> Stream<T::Output, L, B, NoOrder, R>
2465 where
2466 T: Future,
2467 {
2468 Stream::new(
2469 self.location.clone(),
2470 HydroNode::ResolveFuturesBlocking {
2471 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2472 metadata: self
2473 .location
2474 .new_node_metadata(Stream::<T::Output, L, B, NoOrder, R>::collection_kind()),
2475 },
2476 )
2477 }
2478
2479 /// Returns a [`Singleton`] containing `true` if the stream has no elements, or `false` otherwise.
2480 ///
2481 /// # Example
2482 /// ```rust
2483 /// # #[cfg(feature = "deploy")] {
2484 /// # use hydro_lang::prelude::*;
2485 /// # use futures::StreamExt;
2486 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2487 /// let tick = process.tick();
2488 /// let empty: Stream<i32, _, Bounded> = process
2489 /// .source_iter(q!(Vec::<i32>::new()))
2490 /// .batch(&tick, nondet!(/** test */));
2491 /// empty.is_empty().all_ticks()
2492 /// # }, |mut stream| async move {
2493 /// // true
2494 /// # assert_eq!(stream.next().await.unwrap(), true);
2495 /// # }));
2496 /// # }
2497 /// ```
2498 #[expect(clippy::wrong_self_convention, reason = "stream function naming")]
2499 pub fn is_empty(self) -> Singleton<bool, L, Bounded>
2500 where
2501 B: IsBounded,
2502 {
2503 self.make_bounded()
2504 .assume_ordering_trusted::<TotalOrder>(
2505 nondet!(/** is_empty intermediates unaffected by order */),
2506 )
2507 .assume_retries_trusted::<ExactlyOnce>(nondet!(/** is_empty is idempotent */))
2508 .fold(q!(|| true), q!(|empty, _| { *empty = false },))
2509 }
2510}
2511
2512impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2513where
2514 L: Location<'a>,
2515{
2516 #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2517 /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2518 /// by equi-joining the two streams on the key attribute `K`.
2519 ///
2520 /// # Example
2521 /// ```rust
2522 /// # #[cfg(feature = "deploy")] {
2523 /// # use hydro_lang::prelude::*;
2524 /// # use std::collections::HashSet;
2525 /// # use futures::StreamExt;
2526 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2527 /// let tick = process.tick();
2528 /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2529 /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2530 /// stream1.join(stream2)
2531 /// # }, |mut stream| async move {
2532 /// // (1, ('a', 'x')), (2, ('b', 'y'))
2533 /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2534 /// # stream.map(|i| assert!(expected.contains(&i)));
2535 /// # }));
2536 /// # }
2537 pub fn join<V2, O2: Ordering, R2: Retries>(
2538 self,
2539 n: Stream<(K, V2), L, B, O2, R2>,
2540 ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
2541 where
2542 K: Eq + Hash,
2543 R: MinRetries<R2>,
2544 {
2545 check_matching_location(&self.location, &n.location);
2546
2547 Stream::new(
2548 self.location.clone(),
2549 HydroNode::Join {
2550 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2551 right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2552 metadata: self.location.new_node_metadata(Stream::<
2553 (K, (V1, V2)),
2554 L,
2555 B,
2556 NoOrder,
2557 <R as MinRetries<R2>>::Min,
2558 >::collection_kind()),
2559 },
2560 )
2561 }
2562
2563 /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2564 /// computes the anti-join of the items in the input -- i.e. returns
2565 /// unique items in the first input that do not have a matching key
2566 /// in the second input.
2567 ///
2568 /// # Example
2569 /// ```rust
2570 /// # #[cfg(feature = "deploy")] {
2571 /// # use hydro_lang::prelude::*;
2572 /// # use futures::StreamExt;
2573 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2574 /// let tick = process.tick();
2575 /// let stream = process
2576 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2577 /// .batch(&tick, nondet!(/** test */));
2578 /// let batch = process
2579 /// .source_iter(q!(vec![1, 2]))
2580 /// .batch(&tick, nondet!(/** test */));
2581 /// stream.anti_join(batch).all_ticks()
2582 /// # }, |mut stream| async move {
2583 /// # for w in vec![(3, 'c'), (4, 'd')] {
2584 /// # assert_eq!(stream.next().await.unwrap(), w);
2585 /// # }
2586 /// # }));
2587 /// # }
2588 pub fn anti_join<O2: Ordering, R2: Retries>(
2589 self,
2590 n: Stream<K, L, Bounded, O2, R2>,
2591 ) -> Stream<(K, V1), L, B, O, R>
2592 where
2593 K: Eq + Hash,
2594 {
2595 check_matching_location(&self.location, &n.location);
2596
2597 Stream::new(
2598 self.location.clone(),
2599 HydroNode::AntiJoin {
2600 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2601 neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2602 metadata: self
2603 .location
2604 .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2605 },
2606 )
2607 }
2608}
2609
2610impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2611 Stream<(K, V), L, B, O, R>
2612{
2613 /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2614 /// is used as the key and the second element is added to the entries associated with that key.
2615 ///
2616 /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2617 /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2618 /// performing grouped aggregations, but also for more precise ordering guarantees such as
2619 /// total ordering _within_ each group but no ordering _across_ groups.
2620 ///
2621 /// # Example
2622 /// ```rust
2623 /// # #[cfg(feature = "deploy")] {
2624 /// # use hydro_lang::prelude::*;
2625 /// # use futures::StreamExt;
2626 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2627 /// process
2628 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2629 /// .into_keyed()
2630 /// # .entries()
2631 /// # }, |mut stream| async move {
2632 /// // { 1: [2, 3], 2: [4] }
2633 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2634 /// # assert_eq!(stream.next().await.unwrap(), w);
2635 /// # }
2636 /// # }));
2637 /// # }
2638 /// ```
2639 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2640 KeyedStream::new(
2641 self.location.clone(),
2642 HydroNode::Cast {
2643 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2644 metadata: self
2645 .location
2646 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2647 },
2648 )
2649 }
2650}
2651
2652impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2653where
2654 K: Eq + Hash,
2655 L: Location<'a>,
2656{
2657 /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2658 /// # Example
2659 /// ```rust
2660 /// # #[cfg(feature = "deploy")] {
2661 /// # use hydro_lang::prelude::*;
2662 /// # use futures::StreamExt;
2663 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2664 /// let tick = process.tick();
2665 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2666 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2667 /// batch.keys().all_ticks()
2668 /// # }, |mut stream| async move {
2669 /// // 1, 2
2670 /// # assert_eq!(stream.next().await.unwrap(), 1);
2671 /// # assert_eq!(stream.next().await.unwrap(), 2);
2672 /// # }));
2673 /// # }
2674 /// ```
2675 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2676 self.into_keyed()
2677 .fold(
2678 q!(|| ()),
2679 q!(
2680 |_, _| {},
2681 commutative = manual_proof!(/** values are ignored */),
2682 idempotent = manual_proof!(/** values are ignored */)
2683 ),
2684 )
2685 .keys()
2686 }
2687}
2688
2689impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2690where
2691 L: Location<'a> + NoTick,
2692{
2693 /// Returns a stream corresponding to the latest batch of elements being atomically
2694 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2695 /// the order of the input.
2696 ///
2697 /// # Non-Determinism
2698 /// The batch boundaries are non-deterministic and may change across executions.
2699 pub fn batch_atomic(
2700 self,
2701 tick: &Tick<L>,
2702 _nondet: NonDet,
2703 ) -> Stream<T, Tick<L>, Bounded, O, R> {
2704 Stream::new(
2705 tick.clone(),
2706 HydroNode::Batch {
2707 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2708 metadata: tick
2709 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2710 },
2711 )
2712 }
2713
2714 /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2715 /// See [`Stream::atomic`] for more details.
2716 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2717 Stream::new(
2718 self.location.tick.l.clone(),
2719 HydroNode::EndAtomic {
2720 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2721 metadata: self
2722 .location
2723 .tick
2724 .l
2725 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2726 },
2727 )
2728 }
2729}
2730
2731impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2732where
2733 L: Location<'a> + NoTick + NoAtomic,
2734 F: Future<Output = T>,
2735{
2736 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2737 /// Future outputs are produced as available, regardless of input arrival order.
2738 ///
2739 /// # Example
2740 /// ```rust
2741 /// # #[cfg(feature = "deploy")] {
2742 /// # use std::collections::HashSet;
2743 /// # use futures::StreamExt;
2744 /// # use hydro_lang::prelude::*;
2745 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2746 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2747 /// .map(q!(|x| async move {
2748 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2749 /// x
2750 /// }))
2751 /// .resolve_futures()
2752 /// # },
2753 /// # |mut stream| async move {
2754 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2755 /// # let mut output = HashSet::new();
2756 /// # for _ in 1..10 {
2757 /// # output.insert(stream.next().await.unwrap());
2758 /// # }
2759 /// # assert_eq!(
2760 /// # output,
2761 /// # HashSet::<i32>::from_iter(1..10)
2762 /// # );
2763 /// # },
2764 /// # ));
2765 /// # }
2766 pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
2767 Stream::new(
2768 self.location.clone(),
2769 HydroNode::ResolveFutures {
2770 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2771 metadata: self
2772 .location
2773 .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
2774 },
2775 )
2776 }
2777
2778 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2779 /// Future outputs are produced in the same order as the input stream.
2780 ///
2781 /// # Example
2782 /// ```rust
2783 /// # #[cfg(feature = "deploy")] {
2784 /// # use std::collections::HashSet;
2785 /// # use futures::StreamExt;
2786 /// # use hydro_lang::prelude::*;
2787 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2788 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2789 /// .map(q!(|x| async move {
2790 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2791 /// x
2792 /// }))
2793 /// .resolve_futures_ordered()
2794 /// # },
2795 /// # |mut stream| async move {
2796 /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2797 /// # let mut output = Vec::new();
2798 /// # for _ in 1..10 {
2799 /// # output.push(stream.next().await.unwrap());
2800 /// # }
2801 /// # assert_eq!(
2802 /// # output,
2803 /// # vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2804 /// # );
2805 /// # },
2806 /// # ));
2807 /// # }
2808 pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
2809 Stream::new(
2810 self.location.clone(),
2811 HydroNode::ResolveFuturesOrdered {
2812 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2813 metadata: self
2814 .location
2815 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2816 },
2817 )
2818 }
2819}
2820
2821impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2822where
2823 L: Location<'a>,
2824{
2825 /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2826 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2827 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2828 Stream::new(
2829 self.location.outer().clone(),
2830 HydroNode::YieldConcat {
2831 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2832 metadata: self
2833 .location
2834 .outer()
2835 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2836 },
2837 )
2838 }
2839
2840 /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2841 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2842 ///
2843 /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2844 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2845 /// stream's [`Tick`] context.
2846 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2847 let out_location = Atomic {
2848 tick: self.location.clone(),
2849 };
2850
2851 Stream::new(
2852 out_location.clone(),
2853 HydroNode::YieldConcat {
2854 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2855 metadata: out_location
2856 .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2857 },
2858 )
2859 }
2860
2861 /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
2862 /// such as `fold` retrain their memory across ticks rather than resetting across batches of
2863 /// input.
2864 ///
2865 /// This API is particularly useful for stateful computation on batches of data, such as
2866 /// maintaining an accumulated state that is up to date with the current batch.
2867 ///
2868 /// # Example
2869 /// ```rust
2870 /// # #[cfg(feature = "deploy")] {
2871 /// # use hydro_lang::prelude::*;
2872 /// # use futures::StreamExt;
2873 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2874 /// let tick = process.tick();
2875 /// # // ticks are lazy by default, forces the second tick to run
2876 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2877 /// # let batch_first_tick = process
2878 /// # .source_iter(q!(vec![1, 2, 3, 4]))
2879 /// # .batch(&tick, nondet!(/** test */));
2880 /// # let batch_second_tick = process
2881 /// # .source_iter(q!(vec![5, 6, 7]))
2882 /// # .batch(&tick, nondet!(/** test */))
2883 /// # .defer_tick(); // appears on the second tick
2884 /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
2885 /// # batch_first_tick.chain(batch_second_tick).all_ticks();
2886 ///
2887 /// input.batch(&tick, nondet!(/** test */))
2888 /// .across_ticks(|s| s.count()).all_ticks()
2889 /// # }, |mut stream| async move {
2890 /// // [4, 7]
2891 /// assert_eq!(stream.next().await.unwrap(), 4);
2892 /// assert_eq!(stream.next().await.unwrap(), 7);
2893 /// # }));
2894 /// # }
2895 /// ```
2896 pub fn across_ticks<Out: BatchAtomic>(
2897 self,
2898 thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
2899 ) -> Out::Batched {
2900 thunk(self.all_ticks_atomic()).batched_atomic()
2901 }
2902
2903 /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2904 /// always has the elements of `self` at tick `T - 1`.
2905 ///
2906 /// At tick `0`, the output stream is empty, since there is no previous tick.
2907 ///
2908 /// This operator enables stateful iterative processing with ticks, by sending data from one
2909 /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2910 ///
2911 /// # Example
2912 /// ```rust
2913 /// # #[cfg(feature = "deploy")] {
2914 /// # use hydro_lang::prelude::*;
2915 /// # use futures::StreamExt;
2916 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2917 /// let tick = process.tick();
2918 /// // ticks are lazy by default, forces the second tick to run
2919 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2920 ///
2921 /// let batch_first_tick = process
2922 /// .source_iter(q!(vec![1, 2, 3, 4]))
2923 /// .batch(&tick, nondet!(/** test */));
2924 /// let batch_second_tick = process
2925 /// .source_iter(q!(vec![0, 3, 4, 5, 6]))
2926 /// .batch(&tick, nondet!(/** test */))
2927 /// .defer_tick(); // appears on the second tick
2928 /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2929 ///
2930 /// changes_across_ticks.clone().filter_not_in(
2931 /// changes_across_ticks.defer_tick() // the elements from the previous tick
2932 /// ).all_ticks()
2933 /// # }, |mut stream| async move {
2934 /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2935 /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2936 /// # assert_eq!(stream.next().await.unwrap(), w);
2937 /// # }
2938 /// # }));
2939 /// # }
2940 /// ```
2941 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2942 Stream::new(
2943 self.location.clone(),
2944 HydroNode::DeferTick {
2945 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2946 metadata: self
2947 .location
2948 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2949 },
2950 )
2951 }
2952}
2953
2954#[cfg(test)]
2955mod tests {
2956 #[cfg(feature = "deploy")]
2957 use futures::{SinkExt, StreamExt};
2958 #[cfg(feature = "deploy")]
2959 use hydro_deploy::Deployment;
2960 #[cfg(feature = "deploy")]
2961 use serde::{Deserialize, Serialize};
2962 #[cfg(any(feature = "deploy", feature = "sim"))]
2963 use stageleft::q;
2964
2965 #[cfg(any(feature = "deploy", feature = "sim"))]
2966 use crate::compile::builder::FlowBuilder;
2967 #[cfg(feature = "deploy")]
2968 use crate::live_collections::sliced::sliced;
2969 #[cfg(feature = "deploy")]
2970 use crate::live_collections::stream::ExactlyOnce;
2971 #[cfg(feature = "sim")]
2972 use crate::live_collections::stream::NoOrder;
2973 #[cfg(any(feature = "deploy", feature = "sim"))]
2974 use crate::live_collections::stream::TotalOrder;
2975 #[cfg(any(feature = "deploy", feature = "sim"))]
2976 use crate::location::Location;
2977 #[cfg(any(feature = "deploy", feature = "sim"))]
2978 use crate::nondet::nondet;
2979
2980 mod backtrace_chained_ops;
2981
2982 #[cfg(feature = "deploy")]
2983 struct P1 {}
2984 #[cfg(feature = "deploy")]
2985 struct P2 {}
2986
2987 #[cfg(feature = "deploy")]
2988 #[derive(Serialize, Deserialize, Debug)]
2989 struct SendOverNetwork {
2990 n: u32,
2991 }
2992
2993 #[cfg(feature = "deploy")]
2994 #[tokio::test]
2995 async fn first_ten_distributed() {
2996 use crate::networking::TCP;
2997
2998 let mut deployment = Deployment::new();
2999
3000 let mut flow = FlowBuilder::new();
3001 let first_node = flow.process::<P1>();
3002 let second_node = flow.process::<P2>();
3003 let external = flow.external::<P2>();
3004
3005 let numbers = first_node.source_iter(q!(0..10));
3006 let out_port = numbers
3007 .map(q!(|n| SendOverNetwork { n }))
3008 .send(&second_node, TCP.fail_stop().bincode())
3009 .send_bincode_external(&external);
3010
3011 let nodes = flow
3012 .with_process(&first_node, deployment.Localhost())
3013 .with_process(&second_node, deployment.Localhost())
3014 .with_external(&external, deployment.Localhost())
3015 .deploy(&mut deployment);
3016
3017 deployment.deploy().await.unwrap();
3018
3019 let mut external_out = nodes.connect(out_port).await;
3020
3021 deployment.start().await.unwrap();
3022
3023 for i in 0..10 {
3024 assert_eq!(external_out.next().await.unwrap().n, i);
3025 }
3026 }
3027
3028 #[cfg(feature = "deploy")]
3029 #[tokio::test]
3030 async fn first_cardinality() {
3031 let mut deployment = Deployment::new();
3032
3033 let mut flow = FlowBuilder::new();
3034 let node = flow.process::<()>();
3035 let external = flow.external::<()>();
3036
3037 let node_tick = node.tick();
3038 let count = node_tick
3039 .singleton(q!([1, 2, 3]))
3040 .into_stream()
3041 .flatten_ordered()
3042 .first()
3043 .into_stream()
3044 .count()
3045 .all_ticks()
3046 .send_bincode_external(&external);
3047
3048 let nodes = flow
3049 .with_process(&node, deployment.Localhost())
3050 .with_external(&external, deployment.Localhost())
3051 .deploy(&mut deployment);
3052
3053 deployment.deploy().await.unwrap();
3054
3055 let mut external_out = nodes.connect(count).await;
3056
3057 deployment.start().await.unwrap();
3058
3059 assert_eq!(external_out.next().await.unwrap(), 1);
3060 }
3061
3062 #[cfg(feature = "deploy")]
3063 #[tokio::test]
3064 async fn unbounded_reduce_remembers_state() {
3065 let mut deployment = Deployment::new();
3066
3067 let mut flow = FlowBuilder::new();
3068 let node = flow.process::<()>();
3069 let external = flow.external::<()>();
3070
3071 let (input_port, input) = node.source_external_bincode(&external);
3072 let out = input
3073 .reduce(q!(|acc, v| *acc += v))
3074 .sample_eager(nondet!(/** test */))
3075 .send_bincode_external(&external);
3076
3077 let nodes = flow
3078 .with_process(&node, deployment.Localhost())
3079 .with_external(&external, deployment.Localhost())
3080 .deploy(&mut deployment);
3081
3082 deployment.deploy().await.unwrap();
3083
3084 let mut external_in = nodes.connect(input_port).await;
3085 let mut external_out = nodes.connect(out).await;
3086
3087 deployment.start().await.unwrap();
3088
3089 external_in.send(1).await.unwrap();
3090 assert_eq!(external_out.next().await.unwrap(), 1);
3091
3092 external_in.send(2).await.unwrap();
3093 assert_eq!(external_out.next().await.unwrap(), 3);
3094 }
3095
3096 #[cfg(feature = "deploy")]
3097 #[tokio::test]
3098 async fn top_level_bounded_cross_singleton() {
3099 let mut deployment = Deployment::new();
3100
3101 let mut flow = FlowBuilder::new();
3102 let node = flow.process::<()>();
3103 let external = flow.external::<()>();
3104
3105 let (input_port, input) =
3106 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3107
3108 let out = input
3109 .cross_singleton(
3110 node.source_iter(q!(vec![1, 2, 3]))
3111 .fold(q!(|| 0), q!(|acc, v| *acc += v)),
3112 )
3113 .send_bincode_external(&external);
3114
3115 let nodes = flow
3116 .with_process(&node, deployment.Localhost())
3117 .with_external(&external, deployment.Localhost())
3118 .deploy(&mut deployment);
3119
3120 deployment.deploy().await.unwrap();
3121
3122 let mut external_in = nodes.connect(input_port).await;
3123 let mut external_out = nodes.connect(out).await;
3124
3125 deployment.start().await.unwrap();
3126
3127 external_in.send(1).await.unwrap();
3128 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3129
3130 external_in.send(2).await.unwrap();
3131 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3132 }
3133
3134 #[cfg(feature = "deploy")]
3135 #[tokio::test]
3136 async fn top_level_bounded_reduce_cardinality() {
3137 let mut deployment = Deployment::new();
3138
3139 let mut flow = FlowBuilder::new();
3140 let node = flow.process::<()>();
3141 let external = flow.external::<()>();
3142
3143 let (input_port, input) =
3144 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3145
3146 let out = sliced! {
3147 let input = use(input, nondet!(/** test */));
3148 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
3149 input.cross_singleton(v.into_stream().count())
3150 }
3151 .send_bincode_external(&external);
3152
3153 let nodes = flow
3154 .with_process(&node, deployment.Localhost())
3155 .with_external(&external, deployment.Localhost())
3156 .deploy(&mut deployment);
3157
3158 deployment.deploy().await.unwrap();
3159
3160 let mut external_in = nodes.connect(input_port).await;
3161 let mut external_out = nodes.connect(out).await;
3162
3163 deployment.start().await.unwrap();
3164
3165 external_in.send(1).await.unwrap();
3166 assert_eq!(external_out.next().await.unwrap(), (1, 1));
3167
3168 external_in.send(2).await.unwrap();
3169 assert_eq!(external_out.next().await.unwrap(), (2, 1));
3170 }
3171
3172 #[cfg(feature = "deploy")]
3173 #[tokio::test]
3174 async fn top_level_bounded_into_singleton_cardinality() {
3175 let mut deployment = Deployment::new();
3176
3177 let mut flow = FlowBuilder::new();
3178 let node = flow.process::<()>();
3179 let external = flow.external::<()>();
3180
3181 let (input_port, input) =
3182 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3183
3184 let out = sliced! {
3185 let input = use(input, nondet!(/** test */));
3186 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
3187 input.cross_singleton(v.into_stream().count())
3188 }
3189 .send_bincode_external(&external);
3190
3191 let nodes = flow
3192 .with_process(&node, deployment.Localhost())
3193 .with_external(&external, deployment.Localhost())
3194 .deploy(&mut deployment);
3195
3196 deployment.deploy().await.unwrap();
3197
3198 let mut external_in = nodes.connect(input_port).await;
3199 let mut external_out = nodes.connect(out).await;
3200
3201 deployment.start().await.unwrap();
3202
3203 external_in.send(1).await.unwrap();
3204 assert_eq!(external_out.next().await.unwrap(), (1, 1));
3205
3206 external_in.send(2).await.unwrap();
3207 assert_eq!(external_out.next().await.unwrap(), (2, 1));
3208 }
3209
3210 #[cfg(feature = "deploy")]
3211 #[tokio::test]
3212 async fn atomic_fold_replays_each_tick() {
3213 let mut deployment = Deployment::new();
3214
3215 let mut flow = FlowBuilder::new();
3216 let node = flow.process::<()>();
3217 let external = flow.external::<()>();
3218
3219 let (input_port, input) =
3220 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3221 let tick = node.tick();
3222
3223 let out = input
3224 .batch(&tick, nondet!(/** test */))
3225 .cross_singleton(
3226 node.source_iter(q!(vec![1, 2, 3]))
3227 .atomic()
3228 .fold(q!(|| 0), q!(|acc, v| *acc += v))
3229 .snapshot_atomic(&tick, nondet!(/** test */)),
3230 )
3231 .all_ticks()
3232 .send_bincode_external(&external);
3233
3234 let nodes = flow
3235 .with_process(&node, deployment.Localhost())
3236 .with_external(&external, deployment.Localhost())
3237 .deploy(&mut deployment);
3238
3239 deployment.deploy().await.unwrap();
3240
3241 let mut external_in = nodes.connect(input_port).await;
3242 let mut external_out = nodes.connect(out).await;
3243
3244 deployment.start().await.unwrap();
3245
3246 external_in.send(1).await.unwrap();
3247 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3248
3249 external_in.send(2).await.unwrap();
3250 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3251 }
3252
3253 #[cfg(feature = "deploy")]
3254 #[tokio::test]
3255 async fn unbounded_scan_remembers_state() {
3256 let mut deployment = Deployment::new();
3257
3258 let mut flow = FlowBuilder::new();
3259 let node = flow.process::<()>();
3260 let external = flow.external::<()>();
3261
3262 let (input_port, input) = node.source_external_bincode(&external);
3263 let out = input
3264 .scan(
3265 q!(|| 0),
3266 q!(|acc, v| {
3267 *acc += v;
3268 Some(*acc)
3269 }),
3270 )
3271 .send_bincode_external(&external);
3272
3273 let nodes = flow
3274 .with_process(&node, deployment.Localhost())
3275 .with_external(&external, deployment.Localhost())
3276 .deploy(&mut deployment);
3277
3278 deployment.deploy().await.unwrap();
3279
3280 let mut external_in = nodes.connect(input_port).await;
3281 let mut external_out = nodes.connect(out).await;
3282
3283 deployment.start().await.unwrap();
3284
3285 external_in.send(1).await.unwrap();
3286 assert_eq!(external_out.next().await.unwrap(), 1);
3287
3288 external_in.send(2).await.unwrap();
3289 assert_eq!(external_out.next().await.unwrap(), 3);
3290 }
3291
3292 #[cfg(feature = "deploy")]
3293 #[tokio::test]
3294 async fn unbounded_enumerate_remembers_state() {
3295 let mut deployment = Deployment::new();
3296
3297 let mut flow = FlowBuilder::new();
3298 let node = flow.process::<()>();
3299 let external = flow.external::<()>();
3300
3301 let (input_port, input) = node.source_external_bincode(&external);
3302 let out = input.enumerate().send_bincode_external(&external);
3303
3304 let nodes = flow
3305 .with_process(&node, deployment.Localhost())
3306 .with_external(&external, deployment.Localhost())
3307 .deploy(&mut deployment);
3308
3309 deployment.deploy().await.unwrap();
3310
3311 let mut external_in = nodes.connect(input_port).await;
3312 let mut external_out = nodes.connect(out).await;
3313
3314 deployment.start().await.unwrap();
3315
3316 external_in.send(1).await.unwrap();
3317 assert_eq!(external_out.next().await.unwrap(), (0, 1));
3318
3319 external_in.send(2).await.unwrap();
3320 assert_eq!(external_out.next().await.unwrap(), (1, 2));
3321 }
3322
3323 #[cfg(feature = "deploy")]
3324 #[tokio::test]
3325 async fn unbounded_unique_remembers_state() {
3326 let mut deployment = Deployment::new();
3327
3328 let mut flow = FlowBuilder::new();
3329 let node = flow.process::<()>();
3330 let external = flow.external::<()>();
3331
3332 let (input_port, input) =
3333 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3334 let out = input.unique().send_bincode_external(&external);
3335
3336 let nodes = flow
3337 .with_process(&node, deployment.Localhost())
3338 .with_external(&external, deployment.Localhost())
3339 .deploy(&mut deployment);
3340
3341 deployment.deploy().await.unwrap();
3342
3343 let mut external_in = nodes.connect(input_port).await;
3344 let mut external_out = nodes.connect(out).await;
3345
3346 deployment.start().await.unwrap();
3347
3348 external_in.send(1).await.unwrap();
3349 assert_eq!(external_out.next().await.unwrap(), 1);
3350
3351 external_in.send(2).await.unwrap();
3352 assert_eq!(external_out.next().await.unwrap(), 2);
3353
3354 external_in.send(1).await.unwrap();
3355 external_in.send(3).await.unwrap();
3356 assert_eq!(external_out.next().await.unwrap(), 3);
3357 }
3358
3359 #[cfg(feature = "sim")]
3360 #[test]
3361 #[should_panic]
3362 fn sim_batch_nondet_size() {
3363 let mut flow = FlowBuilder::new();
3364 let node = flow.process::<()>();
3365
3366 let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3367
3368 let tick = node.tick();
3369 let out_recv = input
3370 .batch(&tick, nondet!(/** test */))
3371 .count()
3372 .all_ticks()
3373 .sim_output();
3374
3375 flow.sim().exhaustive(async || {
3376 in_send.send(());
3377 in_send.send(());
3378 in_send.send(());
3379
3380 assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3381 });
3382 }
3383
3384 #[cfg(feature = "sim")]
3385 #[test]
3386 fn sim_batch_preserves_order() {
3387 let mut flow = FlowBuilder::new();
3388 let node = flow.process::<()>();
3389
3390 let (in_send, input) = node.sim_input();
3391
3392 let tick = node.tick();
3393 let out_recv = input
3394 .batch(&tick, nondet!(/** test */))
3395 .all_ticks()
3396 .sim_output();
3397
3398 flow.sim().exhaustive(async || {
3399 in_send.send(1);
3400 in_send.send(2);
3401 in_send.send(3);
3402
3403 out_recv.assert_yields_only([1, 2, 3]).await;
3404 });
3405 }
3406
3407 #[cfg(feature = "sim")]
3408 #[test]
3409 #[should_panic]
3410 fn sim_batch_unordered_shuffles() {
3411 let mut flow = FlowBuilder::new();
3412 let node = flow.process::<()>();
3413
3414 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3415
3416 let tick = node.tick();
3417 let batch = input.batch(&tick, nondet!(/** test */));
3418 let out_recv = batch
3419 .clone()
3420 .min()
3421 .zip(batch.max())
3422 .all_ticks()
3423 .sim_output();
3424
3425 flow.sim().exhaustive(async || {
3426 in_send.send_many_unordered([1, 2, 3]);
3427
3428 if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3429 panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3430 }
3431 });
3432 }
3433
3434 #[cfg(feature = "sim")]
3435 #[test]
3436 fn sim_batch_unordered_shuffles_count() {
3437 let mut flow = FlowBuilder::new();
3438 let node = flow.process::<()>();
3439
3440 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3441
3442 let tick = node.tick();
3443 let batch = input.batch(&tick, nondet!(/** test */));
3444 let out_recv = batch.all_ticks().sim_output();
3445
3446 let instance_count = flow.sim().exhaustive(async || {
3447 in_send.send_many_unordered([1, 2, 3, 4]);
3448 out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3449 });
3450
3451 assert_eq!(
3452 instance_count,
3453 75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3454 )
3455 }
3456
3457 #[cfg(feature = "sim")]
3458 #[test]
3459 #[should_panic]
3460 fn sim_observe_order_batched() {
3461 let mut flow = FlowBuilder::new();
3462 let node = flow.process::<()>();
3463
3464 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3465
3466 let tick = node.tick();
3467 let batch = input.batch(&tick, nondet!(/** test */));
3468 let out_recv = batch
3469 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3470 .all_ticks()
3471 .sim_output();
3472
3473 flow.sim().exhaustive(async || {
3474 in_send.send_many_unordered([1, 2, 3, 4]);
3475 out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3476 });
3477 }
3478
3479 #[cfg(feature = "sim")]
3480 #[test]
3481 fn sim_observe_order_batched_count() {
3482 let mut flow = FlowBuilder::new();
3483 let node = flow.process::<()>();
3484
3485 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3486
3487 let tick = node.tick();
3488 let batch = input.batch(&tick, nondet!(/** test */));
3489 let out_recv = batch
3490 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3491 .all_ticks()
3492 .sim_output();
3493
3494 let instance_count = flow.sim().exhaustive(async || {
3495 in_send.send_many_unordered([1, 2, 3, 4]);
3496 let _ = out_recv.collect::<Vec<_>>().await;
3497 });
3498
3499 assert_eq!(
3500 instance_count,
3501 192 // 4! * 2^{4 - 1}
3502 )
3503 }
3504
3505 #[cfg(feature = "sim")]
3506 #[test]
3507 fn sim_unordered_count_instance_count() {
3508 let mut flow = FlowBuilder::new();
3509 let node = flow.process::<()>();
3510
3511 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3512
3513 let tick = node.tick();
3514 let out_recv = input
3515 .count()
3516 .snapshot(&tick, nondet!(/** test */))
3517 .all_ticks()
3518 .sim_output();
3519
3520 let instance_count = flow.sim().exhaustive(async || {
3521 in_send.send_many_unordered([1, 2, 3, 4]);
3522 assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3523 });
3524
3525 assert_eq!(
3526 instance_count,
3527 16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3528 )
3529 }
3530
3531 #[cfg(feature = "sim")]
3532 #[test]
3533 fn sim_top_level_assume_ordering() {
3534 let mut flow = FlowBuilder::new();
3535 let node = flow.process::<()>();
3536
3537 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3538
3539 let out_recv = input
3540 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3541 .sim_output();
3542
3543 let instance_count = flow.sim().exhaustive(async || {
3544 in_send.send_many_unordered([1, 2, 3]);
3545 let mut out = out_recv.collect::<Vec<_>>().await;
3546 out.sort();
3547 assert_eq!(out, vec![1, 2, 3]);
3548 });
3549
3550 assert_eq!(instance_count, 6)
3551 }
3552
3553 #[cfg(feature = "sim")]
3554 #[test]
3555 fn sim_top_level_assume_ordering_cycle_back() {
3556 let mut flow = FlowBuilder::new();
3557 let node = flow.process::<()>();
3558
3559 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3560
3561 let (complete_cycle_back, cycle_back) =
3562 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3563 let ordered = input
3564 .merge_unordered(cycle_back)
3565 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3566 complete_cycle_back.complete(
3567 ordered
3568 .clone()
3569 .map(q!(|v| v + 1))
3570 .filter(q!(|v| v % 2 == 1)),
3571 );
3572
3573 let out_recv = ordered.sim_output();
3574
3575 let mut saw = false;
3576 let instance_count = flow.sim().exhaustive(async || {
3577 in_send.send_many_unordered([0, 2]);
3578 let out = out_recv.collect::<Vec<_>>().await;
3579
3580 if out.starts_with(&[0, 1, 2]) {
3581 saw = true;
3582 }
3583 });
3584
3585 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3586 assert_eq!(instance_count, 6)
3587 }
3588
3589 #[cfg(feature = "sim")]
3590 #[test]
3591 fn sim_top_level_assume_ordering_cycle_back_tick() {
3592 let mut flow = FlowBuilder::new();
3593 let node = flow.process::<()>();
3594
3595 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3596
3597 let (complete_cycle_back, cycle_back) =
3598 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3599 let ordered = input
3600 .merge_unordered(cycle_back)
3601 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3602 complete_cycle_back.complete(
3603 ordered
3604 .clone()
3605 .batch(&node.tick(), nondet!(/** test */))
3606 .all_ticks()
3607 .map(q!(|v| v + 1))
3608 .filter(q!(|v| v % 2 == 1)),
3609 );
3610
3611 let out_recv = ordered.sim_output();
3612
3613 let mut saw = false;
3614 let instance_count = flow.sim().exhaustive(async || {
3615 in_send.send_many_unordered([0, 2]);
3616 let out = out_recv.collect::<Vec<_>>().await;
3617
3618 if out.starts_with(&[0, 1, 2]) {
3619 saw = true;
3620 }
3621 });
3622
3623 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3624 assert_eq!(instance_count, 58)
3625 }
3626
3627 #[cfg(feature = "sim")]
3628 #[test]
3629 fn sim_top_level_assume_ordering_multiple() {
3630 let mut flow = FlowBuilder::new();
3631 let node = flow.process::<()>();
3632
3633 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3634 let (_, input2) = node.sim_input::<_, NoOrder, _>();
3635
3636 let (complete_cycle_back, cycle_back) =
3637 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3638 let input1_ordered = input
3639 .clone()
3640 .merge_unordered(cycle_back)
3641 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3642 let foo = input1_ordered
3643 .clone()
3644 .map(q!(|v| v + 3))
3645 .weaken_ordering::<NoOrder>()
3646 .merge_unordered(input2)
3647 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3648
3649 complete_cycle_back.complete(foo.filter(q!(|v| *v == 3)));
3650
3651 let out_recv = input1_ordered.sim_output();
3652
3653 let mut saw = false;
3654 let instance_count = flow.sim().exhaustive(async || {
3655 in_send.send_many_unordered([0, 1]);
3656 let out = out_recv.collect::<Vec<_>>().await;
3657
3658 if out.starts_with(&[0, 3, 1]) {
3659 saw = true;
3660 }
3661 });
3662
3663 assert!(saw, "did not see an instance with 0, 3, 1 in order");
3664 assert_eq!(instance_count, 24)
3665 }
3666
3667 #[cfg(feature = "sim")]
3668 #[test]
3669 fn sim_atomic_assume_ordering_cycle_back() {
3670 let mut flow = FlowBuilder::new();
3671 let node = flow.process::<()>();
3672
3673 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3674
3675 let (complete_cycle_back, cycle_back) =
3676 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3677 let ordered = input
3678 .merge_unordered(cycle_back)
3679 .atomic()
3680 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3681 .end_atomic();
3682 complete_cycle_back.complete(
3683 ordered
3684 .clone()
3685 .map(q!(|v| v + 1))
3686 .filter(q!(|v| v % 2 == 1)),
3687 );
3688
3689 let out_recv = ordered.sim_output();
3690
3691 let instance_count = flow.sim().exhaustive(async || {
3692 in_send.send_many_unordered([0, 2]);
3693 let out = out_recv.collect::<Vec<_>>().await;
3694 assert_eq!(out.len(), 4);
3695 });
3696
3697 assert_eq!(instance_count, 22)
3698 }
3699
3700 #[cfg(feature = "deploy")]
3701 #[tokio::test]
3702 async fn partition_evens_odds() {
3703 let mut deployment = Deployment::new();
3704
3705 let mut flow = FlowBuilder::new();
3706 let node = flow.process::<()>();
3707 let external = flow.external::<()>();
3708
3709 let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
3710 let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
3711 let evens_port = evens.send_bincode_external(&external);
3712 let odds_port = odds.send_bincode_external(&external);
3713
3714 let nodes = flow
3715 .with_process(&node, deployment.Localhost())
3716 .with_external(&external, deployment.Localhost())
3717 .deploy(&mut deployment);
3718
3719 deployment.deploy().await.unwrap();
3720
3721 let mut evens_out = nodes.connect(evens_port).await;
3722 let mut odds_out = nodes.connect(odds_port).await;
3723
3724 deployment.start().await.unwrap();
3725
3726 let mut even_results = Vec::new();
3727 for _ in 0..3 {
3728 even_results.push(evens_out.next().await.unwrap());
3729 }
3730 even_results.sort();
3731 assert_eq!(even_results, vec![2, 4, 6]);
3732
3733 let mut odd_results = Vec::new();
3734 for _ in 0..3 {
3735 odd_results.push(odds_out.next().await.unwrap());
3736 }
3737 odd_results.sort();
3738 assert_eq!(odd_results, vec![1, 3, 5]);
3739 }
3740
3741 #[cfg(feature = "deploy")]
3742 #[tokio::test]
3743 async fn unconsumed_inspect_still_runs() {
3744 use crate::deploy::DeployCrateWrapper;
3745
3746 let mut deployment = Deployment::new();
3747
3748 let mut flow = FlowBuilder::new();
3749 let node = flow.process::<()>();
3750
3751 // The return value of .inspect() is intentionally dropped.
3752 // Before the Null-root fix, this would silently do nothing.
3753 node.source_iter(q!(0..5))
3754 .inspect(q!(|x| println!("inspect: {}", x)));
3755
3756 let nodes = flow
3757 .with_process(&node, deployment.Localhost())
3758 .deploy(&mut deployment);
3759
3760 deployment.deploy().await.unwrap();
3761
3762 let mut stdout = nodes.get_process(&node).stdout();
3763
3764 deployment.start().await.unwrap();
3765
3766 let mut lines = Vec::new();
3767 for _ in 0..5 {
3768 lines.push(stdout.recv().await.unwrap());
3769 }
3770 lines.sort();
3771 assert_eq!(
3772 lines,
3773 vec![
3774 "inspect: 0",
3775 "inspect: 1",
3776 "inspect: 2",
3777 "inspect: 3",
3778 "inspect: 4",
3779 ]
3780 );
3781 }
3782
3783 #[cfg(feature = "sim")]
3784 #[test]
3785 fn sim_limit() {
3786 let mut flow = FlowBuilder::new();
3787 let node = flow.process::<()>();
3788
3789 let (in_send, input) = node.sim_input();
3790
3791 let out_recv = input.limit(q!(3)).sim_output();
3792
3793 flow.sim().exhaustive(async || {
3794 in_send.send(1);
3795 in_send.send(2);
3796 in_send.send(3);
3797 in_send.send(4);
3798 in_send.send(5);
3799
3800 out_recv.assert_yields_only([1, 2, 3]).await;
3801 });
3802 }
3803
3804 #[cfg(feature = "sim")]
3805 #[test]
3806 fn sim_limit_zero() {
3807 let mut flow = FlowBuilder::new();
3808 let node = flow.process::<()>();
3809
3810 let (in_send, input) = node.sim_input();
3811
3812 let out_recv = input.limit(q!(0)).sim_output();
3813
3814 flow.sim().exhaustive(async || {
3815 in_send.send(1);
3816 in_send.send(2);
3817
3818 out_recv.assert_yields_only::<i32, _>([]).await;
3819 });
3820 }
3821
3822 #[cfg(feature = "deploy")]
3823 #[tokio::test]
3824 async fn monotone_fold_threshold() {
3825 use crate::properties::manual_proof;
3826
3827 let mut deployment = Deployment::new();
3828
3829 let mut flow = FlowBuilder::new();
3830 let node = flow.process::<()>();
3831 let external = flow.external::<()>();
3832
3833 let in_unbounded: super::Stream<_, _> =
3834 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
3835 let sum = in_unbounded.fold(
3836 q!(|| 0),
3837 q!(
3838 |sum, v| {
3839 *sum += v;
3840 },
3841 monotone = manual_proof!(/** test */)
3842 ),
3843 );
3844
3845 let threshold_out = sum
3846 .threshold_greater_or_equal(node.singleton(q!(7)))
3847 .send_bincode_external(&external);
3848
3849 let nodes = flow
3850 .with_process(&node, deployment.Localhost())
3851 .with_external(&external, deployment.Localhost())
3852 .deploy(&mut deployment);
3853
3854 deployment.deploy().await.unwrap();
3855
3856 let mut threshold_out = nodes.connect(threshold_out).await;
3857
3858 deployment.start().await.unwrap();
3859
3860 assert_eq!(threshold_out.next().await.unwrap(), 7);
3861 }
3862
3863 #[cfg(feature = "deploy")]
3864 #[tokio::test]
3865 async fn monotone_count_threshold() {
3866 let mut deployment = Deployment::new();
3867
3868 let mut flow = FlowBuilder::new();
3869 let node = flow.process::<()>();
3870 let external = flow.external::<()>();
3871
3872 let in_unbounded: super::Stream<_, _> =
3873 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
3874 let sum = in_unbounded.count();
3875
3876 let threshold_out = sum
3877 .threshold_greater_or_equal(node.singleton(q!(3)))
3878 .send_bincode_external(&external);
3879
3880 let nodes = flow
3881 .with_process(&node, deployment.Localhost())
3882 .with_external(&external, deployment.Localhost())
3883 .deploy(&mut deployment);
3884
3885 deployment.deploy().await.unwrap();
3886
3887 let mut threshold_out = nodes.connect(threshold_out).await;
3888
3889 deployment.start().await.unwrap();
3890
3891 assert_eq!(threshold_out.next().await.unwrap(), 3);
3892 }
3893}