Skip to main content

hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::{Generate, KeyedStream};
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::singleton::SingletonBound;
27#[cfg(stageleft_runtime)]
28use crate::location::dynamic::{DynLocation, LocationId};
29use crate::location::tick::{Atomic, DeferTick, NoAtomic};
30use crate::location::{Location, NoTick, Tick, check_matching_location};
31use crate::manual_expr::ManualExpr;
32use crate::nondet::{NonDet, nondet};
33use crate::prelude::manual_proof;
34use crate::properties::{
35    AggFuncAlgebra, ApplyMonotoneStream, ValidCommutativityFor, ValidIdempotenceFor,
36};
37
38pub mod networking;
39
40/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
41#[sealed::sealed]
42pub trait Ordering:
43    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
44{
45    /// The [`StreamOrder`] corresponding to this type.
46    const ORDERING_KIND: StreamOrder;
47}
48
49/// Marks the stream as being totally ordered, which means that there are
50/// no sources of non-determinism (other than intentional ones) that will
51/// affect the order of elements.
52pub enum TotalOrder {}
53
54#[sealed::sealed]
55impl Ordering for TotalOrder {
56    const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
57}
58
59/// Marks the stream as having no order, which means that the order of
60/// elements may be affected by non-determinism.
61///
62/// This restricts certain operators, such as `fold` and `reduce`, to only
63/// be used with commutative aggregation functions.
64pub enum NoOrder {}
65
66#[sealed::sealed]
67impl Ordering for NoOrder {
68    const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
69}
70
71/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
72/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
73/// have `Self` guarantees instead.
74#[sealed::sealed]
75pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
76#[sealed::sealed]
77impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
78
79/// Helper trait for determining the weakest of two orderings.
80#[sealed::sealed]
81pub trait MinOrder<Other: ?Sized> {
82    /// The weaker of the two orderings.
83    type Min: Ordering;
84}
85
86#[sealed::sealed]
87impl<O: Ordering> MinOrder<O> for TotalOrder {
88    type Min = O;
89}
90
91#[sealed::sealed]
92impl<O: Ordering> MinOrder<O> for NoOrder {
93    type Min = NoOrder;
94}
95
96/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
97#[sealed::sealed]
98pub trait Retries:
99    MinRetries<Self, Min = Self>
100    + MinRetries<ExactlyOnce, Min = Self>
101    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
102{
103    /// The [`StreamRetry`] corresponding to this type.
104    const RETRIES_KIND: StreamRetry;
105}
106
107/// Marks the stream as having deterministic message cardinality, with no
108/// possibility of duplicates.
109pub enum ExactlyOnce {}
110
111#[sealed::sealed]
112impl Retries for ExactlyOnce {
113    const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
114}
115
116/// Marks the stream as having non-deterministic message cardinality, which
117/// means that duplicates may occur, but messages will not be dropped.
118pub enum AtLeastOnce {}
119
120#[sealed::sealed]
121impl Retries for AtLeastOnce {
122    const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
123}
124
125/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
126/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
127/// have `Self` guarantees instead.
128#[sealed::sealed]
129pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
130#[sealed::sealed]
131impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
132
133/// Helper trait for determining the weakest of two retry guarantees.
134#[sealed::sealed]
135pub trait MinRetries<Other: ?Sized> {
136    /// The weaker of the two retry guarantees.
137    type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
138}
139
140#[sealed::sealed]
141impl<R: Retries> MinRetries<R> for ExactlyOnce {
142    type Min = R;
143}
144
145#[sealed::sealed]
146impl<R: Retries> MinRetries<R> for AtLeastOnce {
147    type Min = AtLeastOnce;
148}
149
150#[sealed::sealed]
151#[diagnostic::on_unimplemented(
152    message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
153    label = "required here",
154    note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
155)]
156/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
157pub trait IsOrdered: Ordering {}
158
159#[sealed::sealed]
160#[diagnostic::do_not_recommend]
161impl IsOrdered for TotalOrder {}
162
163#[sealed::sealed]
164#[diagnostic::on_unimplemented(
165    message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
166    label = "required here",
167    note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
168)]
169/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
170pub trait IsExactlyOnce: Retries {}
171
172#[sealed::sealed]
173#[diagnostic::do_not_recommend]
174impl IsExactlyOnce for ExactlyOnce {}
175
176/// Streaming sequence of elements with type `Type`.
177///
178/// This live collection represents a growing sequence of elements, with new elements being
179/// asynchronously appended to the end of the sequence. This can be used to model the arrival
180/// of network input, such as API requests, or streaming ingestion.
181///
182/// By default, all streams have deterministic ordering and each element is materialized exactly
183/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
184/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
185/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
186///
187/// Type Parameters:
188/// - `Type`: the type of elements in the stream
189/// - `Loc`: the location where the stream is being materialized
190/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
191/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
192///   (default is [`TotalOrder`])
193/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
194///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
195pub struct Stream<
196    Type,
197    Loc,
198    Bound: Boundedness = Unbounded,
199    Order: Ordering = TotalOrder,
200    Retry: Retries = ExactlyOnce,
201> {
202    pub(crate) location: Loc,
203    pub(crate) ir_node: RefCell<HydroNode>,
204    pub(crate) flow_state: FlowState,
205
206    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
207}
208
209impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
210    fn drop(&mut self) {
211        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
212        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
213            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
214                input: Box::new(ir_node),
215                op_metadata: HydroIrOpMetadata::new(),
216            });
217        }
218    }
219}
220
221impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
222    for Stream<T, L, Unbounded, O, R>
223where
224    L: Location<'a>,
225{
226    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
227        let new_meta = stream
228            .location
229            .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
230
231        Stream {
232            location: stream.location.clone(),
233            flow_state: stream.flow_state.clone(),
234            ir_node: RefCell::new(HydroNode::Cast {
235                inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
236                metadata: new_meta,
237            }),
238            _phantom: PhantomData,
239        }
240    }
241}
242
243impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
244    for Stream<T, L, B, NoOrder, R>
245where
246    L: Location<'a>,
247{
248    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
249        stream.weaken_ordering()
250    }
251}
252
253impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
254    for Stream<T, L, B, O, AtLeastOnce>
255where
256    L: Location<'a>,
257{
258    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
259        stream.weaken_retries()
260    }
261}
262
263impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
264where
265    L: Location<'a>,
266{
267    fn defer_tick(self) -> Self {
268        Stream::defer_tick(self)
269    }
270}
271
272impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
273    for Stream<T, Tick<L>, Bounded, O, R>
274where
275    L: Location<'a>,
276{
277    type Location = Tick<L>;
278
279    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
280        Stream::new(
281            location.clone(),
282            HydroNode::CycleSource {
283                cycle_id,
284                metadata: location.new_node_metadata(Self::collection_kind()),
285            },
286        )
287    }
288}
289
290impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
291    for Stream<T, Tick<L>, Bounded, O, R>
292where
293    L: Location<'a>,
294{
295    type Location = Tick<L>;
296
297    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
298        let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
299            location.clone(),
300            HydroNode::DeferTick {
301                input: Box::new(HydroNode::CycleSource {
302                    cycle_id,
303                    metadata: location.new_node_metadata(Self::collection_kind()),
304                }),
305                metadata: location.new_node_metadata(Self::collection_kind()),
306            },
307        );
308
309        from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
310    }
311}
312
313impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
314    for Stream<T, Tick<L>, Bounded, O, R>
315where
316    L: Location<'a>,
317{
318    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
319        assert_eq!(
320            Location::id(&self.location),
321            expected_location,
322            "locations do not match"
323        );
324        self.location
325            .flow_state()
326            .borrow_mut()
327            .push_root(HydroRoot::CycleSink {
328                cycle_id,
329                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
330                op_metadata: HydroIrOpMetadata::new(),
331            });
332    }
333}
334
335impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
336    for Stream<T, L, B, O, R>
337where
338    L: Location<'a> + NoTick,
339{
340    type Location = L;
341
342    fn create_source(cycle_id: CycleId, location: L) -> Self {
343        Stream::new(
344            location.clone(),
345            HydroNode::CycleSource {
346                cycle_id,
347                metadata: location.new_node_metadata(Self::collection_kind()),
348            },
349        )
350    }
351}
352
353impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
354    for Stream<T, L, B, O, R>
355where
356    L: Location<'a> + NoTick,
357{
358    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
359        assert_eq!(
360            Location::id(&self.location),
361            expected_location,
362            "locations do not match"
363        );
364        self.location
365            .flow_state()
366            .borrow_mut()
367            .push_root(HydroRoot::CycleSink {
368                cycle_id,
369                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
370                op_metadata: HydroIrOpMetadata::new(),
371            });
372    }
373}
374
375impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
376where
377    T: Clone,
378    L: Location<'a>,
379{
380    fn clone(&self) -> Self {
381        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
382            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
383            *self.ir_node.borrow_mut() = HydroNode::Tee {
384                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
385                metadata: self.location.new_node_metadata(Self::collection_kind()),
386            };
387        }
388
389        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
390            Stream {
391                location: self.location.clone(),
392                flow_state: self.flow_state.clone(),
393                ir_node: HydroNode::Tee {
394                    inner: SharedNode(inner.0.clone()),
395                    metadata: metadata.clone(),
396                }
397                .into(),
398                _phantom: PhantomData,
399            }
400        } else {
401            unreachable!()
402        }
403    }
404}
405
406impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
407where
408    L: Location<'a>,
409{
410    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
411        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
412        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
413
414        let flow_state = location.flow_state().clone();
415        Stream {
416            location,
417            flow_state,
418            ir_node: RefCell::new(ir_node),
419            _phantom: PhantomData,
420        }
421    }
422
423    /// Returns the [`Location`] where this stream is being materialized.
424    pub fn location(&self) -> &L {
425        &self.location
426    }
427
428    pub(crate) fn collection_kind() -> CollectionKind {
429        CollectionKind::Stream {
430            bound: B::BOUND_KIND,
431            order: O::ORDERING_KIND,
432            retry: R::RETRIES_KIND,
433            element_type: quote_type::<T>().into(),
434        }
435    }
436
437    /// Produces a stream based on invoking `f` on each element.
438    /// If you do not want to modify the stream and instead only want to view
439    /// each item use [`Stream::inspect`] instead.
440    ///
441    /// # Example
442    /// ```rust
443    /// # #[cfg(feature = "deploy")] {
444    /// # use hydro_lang::prelude::*;
445    /// # use futures::StreamExt;
446    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
447    /// let words = process.source_iter(q!(vec!["hello", "world"]));
448    /// words.map(q!(|x| x.to_uppercase()))
449    /// # }, |mut stream| async move {
450    /// # for w in vec!["HELLO", "WORLD"] {
451    /// #     assert_eq!(stream.next().await.unwrap(), w);
452    /// # }
453    /// # }));
454    /// # }
455    /// ```
456    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
457    where
458        F: Fn(T) -> U + 'a,
459    {
460        let f = f.splice_fn1_ctx(&self.location).into();
461        Stream::new(
462            self.location.clone(),
463            HydroNode::Map {
464                f,
465                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
466                metadata: self
467                    .location
468                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
469            },
470        )
471    }
472
473    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
474    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
475    /// for the output type `U` must produce items in a **deterministic** order.
476    ///
477    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
478    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
479    ///
480    /// # Example
481    /// ```rust
482    /// # #[cfg(feature = "deploy")] {
483    /// # use hydro_lang::prelude::*;
484    /// # use futures::StreamExt;
485    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
486    /// process
487    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
488    ///     .flat_map_ordered(q!(|x| x))
489    /// # }, |mut stream| async move {
490    /// // 1, 2, 3, 4
491    /// # for w in (1..5) {
492    /// #     assert_eq!(stream.next().await.unwrap(), w);
493    /// # }
494    /// # }));
495    /// # }
496    /// ```
497    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
498    where
499        I: IntoIterator<Item = U>,
500        F: Fn(T) -> I + 'a,
501    {
502        let f = f.splice_fn1_ctx(&self.location).into();
503        Stream::new(
504            self.location.clone(),
505            HydroNode::FlatMap {
506                f,
507                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
508                metadata: self
509                    .location
510                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
511            },
512        )
513    }
514
515    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
516    /// for the output type `U` to produce items in any order.
517    ///
518    /// # Example
519    /// ```rust
520    /// # #[cfg(feature = "deploy")] {
521    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
522    /// # use futures::StreamExt;
523    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
524    /// process
525    ///     .source_iter(q!(vec![
526    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
527    ///         std::collections::HashSet::from_iter(vec![3, 4]),
528    ///     ]))
529    ///     .flat_map_unordered(q!(|x| x))
530    /// # }, |mut stream| async move {
531    /// // 1, 2, 3, 4, but in no particular order
532    /// # let mut results = Vec::new();
533    /// # for w in (1..5) {
534    /// #     results.push(stream.next().await.unwrap());
535    /// # }
536    /// # results.sort();
537    /// # assert_eq!(results, vec![1, 2, 3, 4]);
538    /// # }));
539    /// # }
540    /// ```
541    pub fn flat_map_unordered<U, I, F>(
542        self,
543        f: impl IntoQuotedMut<'a, F, L>,
544    ) -> Stream<U, L, B, NoOrder, R>
545    where
546        I: IntoIterator<Item = U>,
547        F: Fn(T) -> I + 'a,
548    {
549        let f = f.splice_fn1_ctx(&self.location).into();
550        Stream::new(
551            self.location.clone(),
552            HydroNode::FlatMap {
553                f,
554                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
555                metadata: self
556                    .location
557                    .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
558            },
559        )
560    }
561
562    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
563    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
564    ///
565    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
566    /// not deterministic, use [`Stream::flatten_unordered`] instead.
567    ///
568    /// ```rust
569    /// # #[cfg(feature = "deploy")] {
570    /// # use hydro_lang::prelude::*;
571    /// # use futures::StreamExt;
572    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
573    /// process
574    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
575    ///     .flatten_ordered()
576    /// # }, |mut stream| async move {
577    /// // 1, 2, 3, 4
578    /// # for w in (1..5) {
579    /// #     assert_eq!(stream.next().await.unwrap(), w);
580    /// # }
581    /// # }));
582    /// # }
583    /// ```
584    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
585    where
586        T: IntoIterator<Item = U>,
587    {
588        self.flat_map_ordered(q!(|d| d))
589    }
590
591    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
592    /// for the element type `T` to produce items in any order.
593    ///
594    /// # Example
595    /// ```rust
596    /// # #[cfg(feature = "deploy")] {
597    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
598    /// # use futures::StreamExt;
599    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
600    /// process
601    ///     .source_iter(q!(vec![
602    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
603    ///         std::collections::HashSet::from_iter(vec![3, 4]),
604    ///     ]))
605    ///     .flatten_unordered()
606    /// # }, |mut stream| async move {
607    /// // 1, 2, 3, 4, but in no particular order
608    /// # let mut results = Vec::new();
609    /// # for w in (1..5) {
610    /// #     results.push(stream.next().await.unwrap());
611    /// # }
612    /// # results.sort();
613    /// # assert_eq!(results, vec![1, 2, 3, 4]);
614    /// # }));
615    /// # }
616    /// ```
617    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
618    where
619        T: IntoIterator<Item = U>,
620    {
621        self.flat_map_unordered(q!(|d| d))
622    }
623
624    /// For each item in the input stream, apply `f` to produce a [`futures::stream::Stream`],
625    /// then emit the elements of that stream one by one. When the inner stream yields
626    /// `Pending`, this operator yields as well.
627    pub fn flat_map_stream_blocking<U, S, F>(
628        self,
629        f: impl IntoQuotedMut<'a, F, L>,
630    ) -> Stream<U, L, B, O, R>
631    where
632        S: futures::Stream<Item = U>,
633        F: Fn(T) -> S + 'a,
634    {
635        let f = f.splice_fn1_ctx(&self.location).into();
636        Stream::new(
637            self.location.clone(),
638            HydroNode::FlatMapStreamBlocking {
639                f,
640                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
641                metadata: self
642                    .location
643                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
644            },
645        )
646    }
647
648    /// For each item in the input stream, treat it as a [`futures::stream::Stream`] and
649    /// emit its elements one by one. When the inner stream yields `Pending`, this operator
650    /// yields as well.
651    pub fn flatten_stream_blocking<U>(self) -> Stream<U, L, B, O, R>
652    where
653        T: futures::Stream<Item = U>,
654    {
655        self.flat_map_stream_blocking(q!(|d| d))
656    }
657
658    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
659    /// `f`, preserving the order of the elements.
660    ///
661    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
662    /// not modify or take ownership of the values. If you need to modify the values while filtering
663    /// use [`Stream::filter_map`] instead.
664    ///
665    /// # Example
666    /// ```rust
667    /// # #[cfg(feature = "deploy")] {
668    /// # use hydro_lang::prelude::*;
669    /// # use futures::StreamExt;
670    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
671    /// process
672    ///     .source_iter(q!(vec![1, 2, 3, 4]))
673    ///     .filter(q!(|&x| x > 2))
674    /// # }, |mut stream| async move {
675    /// // 3, 4
676    /// # for w in (3..5) {
677    /// #     assert_eq!(stream.next().await.unwrap(), w);
678    /// # }
679    /// # }));
680    /// # }
681    /// ```
682    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
683    where
684        F: Fn(&T) -> bool + 'a,
685    {
686        let f = f.splice_fn1_borrow_ctx(&self.location).into();
687        Stream::new(
688            self.location.clone(),
689            HydroNode::Filter {
690                f,
691                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
692                metadata: self.location.new_node_metadata(Self::collection_kind()),
693            },
694        )
695    }
696
697    /// Splits the stream into two streams based on a predicate, without cloning elements.
698    ///
699    /// Elements for which `f` returns `true` are sent to the first output stream,
700    /// and elements for which `f` returns `false` are sent to the second output stream.
701    ///
702    /// Unlike using `filter` twice, this only evaluates the predicate once per element
703    /// and does not require `T: Clone`.
704    ///
705    /// The closure `f` receives a reference `&T` rather than an owned value `T` because
706    /// the predicate is only used for routing; the element itself is moved to the
707    /// appropriate output stream.
708    ///
709    /// # Example
710    /// ```rust
711    /// # #[cfg(feature = "deploy")] {
712    /// # use hydro_lang::prelude::*;
713    /// # use hydro_lang::live_collections::stream::{NoOrder, ExactlyOnce};
714    /// # use futures::StreamExt;
715    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
716    /// let numbers: Stream<_, _, Unbounded> = process.source_iter(q!(vec![1, 2, 3, 4, 5, 6])).into();
717    /// let (evens, odds) = numbers.partition(q!(|&x| x % 2 == 0));
718    /// // evens: 2, 4, 6 tagged with true; odds: 1, 3, 5 tagged with false
719    /// evens.map(q!(|x| (x, true)))
720    ///     .merge_unordered(odds.map(q!(|x| (x, false))))
721    /// # }, |mut stream| async move {
722    /// # let mut results = Vec::new();
723    /// # for _ in 0..6 {
724    /// #     results.push(stream.next().await.unwrap());
725    /// # }
726    /// # results.sort();
727    /// # assert_eq!(results, vec![(1, false), (2, true), (3, false), (4, true), (5, false), (6, true)]);
728    /// # }));
729    /// # }
730    /// ```
731    #[expect(
732        clippy::type_complexity,
733        reason = "return type mirrors the input stream type"
734    )]
735    pub fn partition<F>(
736        self,
737        f: impl IntoQuotedMut<'a, F, L>,
738    ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
739    where
740        F: Fn(&T) -> bool + 'a,
741    {
742        let f: crate::compile::ir::DebugExpr = f.splice_fn1_borrow_ctx(&self.location).into();
743        let shared = SharedNode(Rc::new(RefCell::new(
744            self.ir_node.replace(HydroNode::Placeholder),
745        )));
746
747        let true_stream = Stream::new(
748            self.location.clone(),
749            HydroNode::Partition {
750                inner: SharedNode(shared.0.clone()),
751                f: f.clone(),
752                is_true: true,
753                metadata: self.location.new_node_metadata(Self::collection_kind()),
754            },
755        );
756
757        let false_stream = Stream::new(
758            self.location.clone(),
759            HydroNode::Partition {
760                inner: SharedNode(shared.0),
761                f,
762                is_true: false,
763                metadata: self.location.new_node_metadata(Self::collection_kind()),
764            },
765        );
766
767        (true_stream, false_stream)
768    }
769
770    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
771    ///
772    /// # Example
773    /// ```rust
774    /// # #[cfg(feature = "deploy")] {
775    /// # use hydro_lang::prelude::*;
776    /// # use futures::StreamExt;
777    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
778    /// process
779    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
780    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
781    /// # }, |mut stream| async move {
782    /// // 1, 2
783    /// # for w in (1..3) {
784    /// #     assert_eq!(stream.next().await.unwrap(), w);
785    /// # }
786    /// # }));
787    /// # }
788    /// ```
789    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
790    where
791        F: Fn(T) -> Option<U> + 'a,
792    {
793        let f = f.splice_fn1_ctx(&self.location).into();
794        Stream::new(
795            self.location.clone(),
796            HydroNode::FilterMap {
797                f,
798                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
799                metadata: self
800                    .location
801                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
802            },
803        )
804    }
805
806    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
807    /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
808    /// If `other` is an empty [`Optional`], no values will be produced.
809    ///
810    /// # Example
811    /// ```rust
812    /// # #[cfg(feature = "deploy")] {
813    /// # use hydro_lang::prelude::*;
814    /// # use futures::StreamExt;
815    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
816    /// let tick = process.tick();
817    /// let batch = process
818    ///   .source_iter(q!(vec![1, 2, 3, 4]))
819    ///   .batch(&tick, nondet!(/** test */));
820    /// let count = batch.clone().count(); // `count()` returns a singleton
821    /// batch.cross_singleton(count).all_ticks()
822    /// # }, |mut stream| async move {
823    /// // (1, 4), (2, 4), (3, 4), (4, 4)
824    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
825    /// #     assert_eq!(stream.next().await.unwrap(), w);
826    /// # }
827    /// # }));
828    /// # }
829    /// ```
830    pub fn cross_singleton<O2>(
831        self,
832        other: impl Into<Optional<O2, L, Bounded>>,
833    ) -> Stream<(T, O2), L, B, O, R>
834    where
835        O2: Clone,
836    {
837        let other: Optional<O2, L, Bounded> = other.into();
838        check_matching_location(&self.location, &other.location);
839
840        Stream::new(
841            self.location.clone(),
842            HydroNode::CrossSingleton {
843                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
844                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
845                metadata: self
846                    .location
847                    .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
848            },
849        )
850    }
851
852    /// Passes this stream through if the boolean signal is `true`, otherwise the output is empty.
853    ///
854    /// # Example
855    /// ```rust
856    /// # #[cfg(feature = "deploy")] {
857    /// # use hydro_lang::prelude::*;
858    /// # use futures::StreamExt;
859    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
860    /// let tick = process.tick();
861    /// // ticks are lazy by default, forces the second tick to run
862    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
863    ///
864    /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
865    /// let batch_first_tick = process
866    ///   .source_iter(q!(vec![1, 2, 3, 4]))
867    ///   .batch(&tick, nondet!(/** test */));
868    /// let batch_second_tick = process
869    ///   .source_iter(q!(vec![5, 6, 7, 8]))
870    ///   .batch(&tick, nondet!(/** test */))
871    ///   .defer_tick();
872    /// batch_first_tick.chain(batch_second_tick)
873    ///   .filter_if(signal)
874    ///   .all_ticks()
875    /// # }, |mut stream| async move {
876    /// // [1, 2, 3, 4]
877    /// # for w in vec![1, 2, 3, 4] {
878    /// #     assert_eq!(stream.next().await.unwrap(), w);
879    /// # }
880    /// # }));
881    /// # }
882    /// ```
883    pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
884        self.cross_singleton(signal.filter(q!(|b| *b)))
885            .map(q!(|(d, _)| d))
886    }
887
888    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
889    ///
890    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
891    /// leader of a cluster.
892    ///
893    /// # Example
894    /// ```rust
895    /// # #[cfg(feature = "deploy")] {
896    /// # use hydro_lang::prelude::*;
897    /// # use futures::StreamExt;
898    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
899    /// let tick = process.tick();
900    /// // ticks are lazy by default, forces the second tick to run
901    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
902    ///
903    /// let batch_first_tick = process
904    ///   .source_iter(q!(vec![1, 2, 3, 4]))
905    ///   .batch(&tick, nondet!(/** test */));
906    /// let batch_second_tick = process
907    ///   .source_iter(q!(vec![5, 6, 7, 8]))
908    ///   .batch(&tick, nondet!(/** test */))
909    ///   .defer_tick(); // appears on the second tick
910    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
911    /// batch_first_tick.chain(batch_second_tick)
912    ///   .filter_if_some(some_on_first_tick)
913    ///   .all_ticks()
914    /// # }, |mut stream| async move {
915    /// // [1, 2, 3, 4]
916    /// # for w in vec![1, 2, 3, 4] {
917    /// #     assert_eq!(stream.next().await.unwrap(), w);
918    /// # }
919    /// # }));
920    /// # }
921    /// ```
922    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
923    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
924        self.filter_if(signal.is_some())
925    }
926
927    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
928    ///
929    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
930    /// some local state.
931    ///
932    /// # Example
933    /// ```rust
934    /// # #[cfg(feature = "deploy")] {
935    /// # use hydro_lang::prelude::*;
936    /// # use futures::StreamExt;
937    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
938    /// let tick = process.tick();
939    /// // ticks are lazy by default, forces the second tick to run
940    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
941    ///
942    /// let batch_first_tick = process
943    ///   .source_iter(q!(vec![1, 2, 3, 4]))
944    ///   .batch(&tick, nondet!(/** test */));
945    /// let batch_second_tick = process
946    ///   .source_iter(q!(vec![5, 6, 7, 8]))
947    ///   .batch(&tick, nondet!(/** test */))
948    ///   .defer_tick(); // appears on the second tick
949    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
950    /// batch_first_tick.chain(batch_second_tick)
951    ///   .filter_if_none(some_on_first_tick)
952    ///   .all_ticks()
953    /// # }, |mut stream| async move {
954    /// // [5, 6, 7, 8]
955    /// # for w in vec![5, 6, 7, 8] {
956    /// #     assert_eq!(stream.next().await.unwrap(), w);
957    /// # }
958    /// # }));
959    /// # }
960    /// ```
961    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
962    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
963        self.filter_if(other.is_none())
964    }
965
966    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
967    /// tupled pairs in a non-deterministic order.
968    ///
969    /// # Example
970    /// ```rust
971    /// # #[cfg(feature = "deploy")] {
972    /// # use hydro_lang::prelude::*;
973    /// # use std::collections::HashSet;
974    /// # use futures::StreamExt;
975    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
976    /// let tick = process.tick();
977    /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
978    /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
979    /// stream1.cross_product(stream2)
980    /// # }, |mut stream| async move {
981    /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
982    /// # stream.map(|i| assert!(expected.contains(&i)));
983    /// # }));
984    /// # }
985    /// ```
986    pub fn cross_product<T2, O2: Ordering>(
987        self,
988        other: Stream<T2, L, B, O2, R>,
989    ) -> Stream<(T, T2), L, B, NoOrder, R>
990    where
991        T: Clone,
992        T2: Clone,
993    {
994        check_matching_location(&self.location, &other.location);
995
996        Stream::new(
997            self.location.clone(),
998            HydroNode::CrossProduct {
999                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1000                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1001                metadata: self
1002                    .location
1003                    .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
1004            },
1005        )
1006    }
1007
1008    /// Takes one stream as input and filters out any duplicate occurrences. The output
1009    /// contains all unique values from the input.
1010    ///
1011    /// # Example
1012    /// ```rust
1013    /// # #[cfg(feature = "deploy")] {
1014    /// # use hydro_lang::prelude::*;
1015    /// # use futures::StreamExt;
1016    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1017    /// let tick = process.tick();
1018    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
1019    /// # }, |mut stream| async move {
1020    /// # for w in vec![1, 2, 3, 4] {
1021    /// #     assert_eq!(stream.next().await.unwrap(), w);
1022    /// # }
1023    /// # }));
1024    /// # }
1025    /// ```
1026    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
1027    where
1028        T: Eq + Hash,
1029    {
1030        Stream::new(
1031            self.location.clone(),
1032            HydroNode::Unique {
1033                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1034                metadata: self
1035                    .location
1036                    .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1037            },
1038        )
1039    }
1040
1041    /// Outputs everything in this stream that is *not* contained in the `other` stream.
1042    ///
1043    /// The `other` stream must be [`Bounded`], since this function will wait until
1044    /// all its elements are available before producing any output.
1045    /// # Example
1046    /// ```rust
1047    /// # #[cfg(feature = "deploy")] {
1048    /// # use hydro_lang::prelude::*;
1049    /// # use futures::StreamExt;
1050    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1051    /// let tick = process.tick();
1052    /// let stream = process
1053    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
1054    ///   .batch(&tick, nondet!(/** test */));
1055    /// let batch = process
1056    ///   .source_iter(q!(vec![1, 2]))
1057    ///   .batch(&tick, nondet!(/** test */));
1058    /// stream.filter_not_in(batch).all_ticks()
1059    /// # }, |mut stream| async move {
1060    /// # for w in vec![3, 4] {
1061    /// #     assert_eq!(stream.next().await.unwrap(), w);
1062    /// # }
1063    /// # }));
1064    /// # }
1065    /// ```
1066    pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1067    where
1068        T: Eq + Hash,
1069        B2: IsBounded,
1070    {
1071        check_matching_location(&self.location, &other.location);
1072
1073        Stream::new(
1074            self.location.clone(),
1075            HydroNode::Difference {
1076                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1077                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1078                metadata: self
1079                    .location
1080                    .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1081            },
1082        )
1083    }
1084
1085    /// An operator which allows you to "inspect" each element of a stream without
1086    /// modifying it. The closure `f` is called on a reference to each item. This is
1087    /// mainly useful for debugging, and should not be used to generate side-effects.
1088    ///
1089    /// # Example
1090    /// ```rust
1091    /// # #[cfg(feature = "deploy")] {
1092    /// # use hydro_lang::prelude::*;
1093    /// # use futures::StreamExt;
1094    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1095    /// let nums = process.source_iter(q!(vec![1, 2]));
1096    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
1097    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
1098    /// # }, |mut stream| async move {
1099    /// # for w in vec![1, 2] {
1100    /// #     assert_eq!(stream.next().await.unwrap(), w);
1101    /// # }
1102    /// # }));
1103    /// # }
1104    /// ```
1105    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1106    where
1107        F: Fn(&T) + 'a,
1108    {
1109        let f = f.splice_fn1_borrow_ctx(&self.location).into();
1110
1111        Stream::new(
1112            self.location.clone(),
1113            HydroNode::Inspect {
1114                f,
1115                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1116                metadata: self.location.new_node_metadata(Self::collection_kind()),
1117            },
1118        )
1119    }
1120
1121    /// Executes the provided closure for every element in this stream.
1122    ///
1123    /// Because the closure may have side effects, the stream must have deterministic order
1124    /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
1125    /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
1126    /// [`Stream::assume_retries`] with an explanation for why this is the case.
1127    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
1128    where
1129        O: IsOrdered,
1130        R: IsExactlyOnce,
1131    {
1132        let f = f.splice_fn1_ctx(&self.location).into();
1133        self.location
1134            .flow_state()
1135            .borrow_mut()
1136            .push_root(HydroRoot::ForEach {
1137                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1138                f,
1139                op_metadata: HydroIrOpMetadata::new(),
1140            });
1141    }
1142
1143    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
1144    /// TCP socket to some other server. You should _not_ use this API for interacting with
1145    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
1146    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
1147    /// interaction with asynchronous sinks.
1148    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1149    where
1150        O: IsOrdered,
1151        R: IsExactlyOnce,
1152        S: 'a + futures::Sink<T> + Unpin,
1153    {
1154        self.location
1155            .flow_state()
1156            .borrow_mut()
1157            .push_root(HydroRoot::DestSink {
1158                sink: sink.splice_typed_ctx(&self.location).into(),
1159                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1160                op_metadata: HydroIrOpMetadata::new(),
1161            });
1162    }
1163
1164    /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1165    ///
1166    /// # Example
1167    /// ```rust
1168    /// # #[cfg(feature = "deploy")] {
1169    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1170    /// # use futures::StreamExt;
1171    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1172    /// let tick = process.tick();
1173    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1174    /// numbers.enumerate()
1175    /// # }, |mut stream| async move {
1176    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1177    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1178    /// #     assert_eq!(stream.next().await.unwrap(), w);
1179    /// # }
1180    /// # }));
1181    /// # }
1182    /// ```
1183    pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1184    where
1185        O: IsOrdered,
1186        R: IsExactlyOnce,
1187    {
1188        Stream::new(
1189            self.location.clone(),
1190            HydroNode::Enumerate {
1191                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1192                metadata: self.location.new_node_metadata(Stream::<
1193                    (usize, T),
1194                    L,
1195                    B,
1196                    TotalOrder,
1197                    ExactlyOnce,
1198                >::collection_kind()),
1199            },
1200        )
1201    }
1202
1203    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1204    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1205    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1206    ///
1207    /// Depending on the input stream guarantees, the closure may need to be commutative
1208    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1209    ///
1210    /// # Example
1211    /// ```rust
1212    /// # #[cfg(feature = "deploy")] {
1213    /// # use hydro_lang::prelude::*;
1214    /// # use futures::StreamExt;
1215    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1216    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1217    /// words
1218    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1219    ///     .into_stream()
1220    /// # }, |mut stream| async move {
1221    /// // "HELLOWORLD"
1222    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1223    /// # }));
1224    /// # }
1225    /// ```
1226    pub fn fold<A, I, F, C, Idemp, M, B2: SingletonBound>(
1227        self,
1228        init: impl IntoQuotedMut<'a, I, L>,
1229        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1230    ) -> Singleton<A, L, B2>
1231    where
1232        I: Fn() -> A + 'a,
1233        F: Fn(&mut A, T),
1234        C: ValidCommutativityFor<O>,
1235        Idemp: ValidIdempotenceFor<R>,
1236        B: ApplyMonotoneStream<M, B2>,
1237    {
1238        let init = init.splice_fn0_ctx(&self.location).into();
1239        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1240        proof.register_proof(&comb);
1241
1242        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1243        let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1244
1245        let core = HydroNode::Fold {
1246            init,
1247            acc: comb.into(),
1248            input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1249            metadata: ordered_etc
1250                .location
1251                .new_node_metadata(Singleton::<A, L, B2>::collection_kind()),
1252        };
1253
1254        Singleton::new(ordered_etc.location.clone(), core)
1255    }
1256
1257    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1258    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1259    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1260    /// reference, so that it can be modified in place.
1261    ///
1262    /// Depending on the input stream guarantees, the closure may need to be commutative
1263    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1264    ///
1265    /// # Example
1266    /// ```rust
1267    /// # #[cfg(feature = "deploy")] {
1268    /// # use hydro_lang::prelude::*;
1269    /// # use futures::StreamExt;
1270    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1271    /// let bools = process.source_iter(q!(vec![false, true, false]));
1272    /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1273    /// # }, |mut stream| async move {
1274    /// // true
1275    /// # assert_eq!(stream.next().await.unwrap(), true);
1276    /// # }));
1277    /// # }
1278    /// ```
1279    pub fn reduce<F, C, Idemp>(
1280        self,
1281        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1282    ) -> Optional<T, L, B>
1283    where
1284        F: Fn(&mut T, T) + 'a,
1285        C: ValidCommutativityFor<O>,
1286        Idemp: ValidIdempotenceFor<R>,
1287    {
1288        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1289        proof.register_proof(&f);
1290
1291        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1292        let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1293
1294        let core = HydroNode::Reduce {
1295            f: f.into(),
1296            input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1297            metadata: ordered_etc
1298                .location
1299                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1300        };
1301
1302        Optional::new(ordered_etc.location.clone(), core)
1303    }
1304
1305    /// Computes the maximum element in the stream as an [`Optional`], which
1306    /// will be empty until the first element in the input arrives.
1307    ///
1308    /// # Example
1309    /// ```rust
1310    /// # #[cfg(feature = "deploy")] {
1311    /// # use hydro_lang::prelude::*;
1312    /// # use futures::StreamExt;
1313    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1314    /// let tick = process.tick();
1315    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1316    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1317    /// batch.max().all_ticks()
1318    /// # }, |mut stream| async move {
1319    /// // 4
1320    /// # assert_eq!(stream.next().await.unwrap(), 4);
1321    /// # }));
1322    /// # }
1323    /// ```
1324    pub fn max(self) -> Optional<T, L, B>
1325    where
1326        T: Ord,
1327    {
1328        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1329            .assume_ordering_trusted_bounded::<TotalOrder>(
1330                nondet!(/** max is commutative, but order affects intermediates */),
1331            )
1332            .reduce(q!(|curr, new| {
1333                if new > *curr {
1334                    *curr = new;
1335                }
1336            }))
1337    }
1338
1339    /// Computes the minimum element in the stream as an [`Optional`], which
1340    /// will be empty until the first element in the input arrives.
1341    ///
1342    /// # Example
1343    /// ```rust
1344    /// # #[cfg(feature = "deploy")] {
1345    /// # use hydro_lang::prelude::*;
1346    /// # use futures::StreamExt;
1347    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1348    /// let tick = process.tick();
1349    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1350    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1351    /// batch.min().all_ticks()
1352    /// # }, |mut stream| async move {
1353    /// // 1
1354    /// # assert_eq!(stream.next().await.unwrap(), 1);
1355    /// # }));
1356    /// # }
1357    /// ```
1358    pub fn min(self) -> Optional<T, L, B>
1359    where
1360        T: Ord,
1361    {
1362        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1363            .assume_ordering_trusted_bounded::<TotalOrder>(
1364                nondet!(/** max is commutative, but order affects intermediates */),
1365            )
1366            .reduce(q!(|curr, new| {
1367                if new < *curr {
1368                    *curr = new;
1369                }
1370            }))
1371    }
1372
1373    /// Computes the first element in the stream as an [`Optional`], which
1374    /// will be empty until the first element in the input arrives.
1375    ///
1376    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1377    /// re-ordering of elements may cause the first element to change.
1378    ///
1379    /// # Example
1380    /// ```rust
1381    /// # #[cfg(feature = "deploy")] {
1382    /// # use hydro_lang::prelude::*;
1383    /// # use futures::StreamExt;
1384    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1385    /// let tick = process.tick();
1386    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1387    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1388    /// batch.first().all_ticks()
1389    /// # }, |mut stream| async move {
1390    /// // 1
1391    /// # assert_eq!(stream.next().await.unwrap(), 1);
1392    /// # }));
1393    /// # }
1394    /// ```
1395    pub fn first(self) -> Optional<T, L, B>
1396    where
1397        O: IsOrdered,
1398    {
1399        self.make_totally_ordered()
1400            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1401            .reduce(q!(|_, _| {}))
1402    }
1403
1404    /// Computes the last element in the stream as an [`Optional`], which
1405    /// will be empty until an element in the input arrives.
1406    ///
1407    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1408    /// re-ordering of elements may cause the last element to change.
1409    ///
1410    /// # Example
1411    /// ```rust
1412    /// # #[cfg(feature = "deploy")] {
1413    /// # use hydro_lang::prelude::*;
1414    /// # use futures::StreamExt;
1415    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1416    /// let tick = process.tick();
1417    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1418    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1419    /// batch.last().all_ticks()
1420    /// # }, |mut stream| async move {
1421    /// // 4
1422    /// # assert_eq!(stream.next().await.unwrap(), 4);
1423    /// # }));
1424    /// # }
1425    /// ```
1426    pub fn last(self) -> Optional<T, L, B>
1427    where
1428        O: IsOrdered,
1429    {
1430        self.make_totally_ordered()
1431            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1432            .reduce(q!(|curr, new| *curr = new))
1433    }
1434
1435    /// Returns a stream containing at most the first `n` elements of the input stream,
1436    /// preserving the original order. Similar to `LIMIT` in SQL.
1437    ///
1438    /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1439    /// retries, since the result depends on the order and cardinality of elements.
1440    ///
1441    /// # Example
1442    /// ```rust
1443    /// # #[cfg(feature = "deploy")] {
1444    /// # use hydro_lang::prelude::*;
1445    /// # use futures::StreamExt;
1446    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1447    /// let numbers = process.source_iter(q!(vec![10, 20, 30, 40, 50]));
1448    /// numbers.limit(q!(3))
1449    /// # }, |mut stream| async move {
1450    /// // 10, 20, 30
1451    /// # for w in vec![10, 20, 30] {
1452    /// #     assert_eq!(stream.next().await.unwrap(), w);
1453    /// # }
1454    /// # }));
1455    /// # }
1456    /// ```
1457    pub fn limit(
1458        self,
1459        n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1460    ) -> Stream<T, L, B, TotalOrder, ExactlyOnce>
1461    where
1462        O: IsOrdered,
1463        R: IsExactlyOnce,
1464    {
1465        self.generator(
1466            q!(|| 0usize),
1467            q!(move |count, item| {
1468                if *count == n {
1469                    Generate::Break
1470                } else {
1471                    *count += 1;
1472                    if *count == n {
1473                        Generate::Return(item)
1474                    } else {
1475                        Generate::Yield(item)
1476                    }
1477                }
1478            }),
1479        )
1480    }
1481
1482    /// Collects all the elements of this stream into a single [`Vec`] element.
1483    ///
1484    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1485    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1486    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1487    /// the vector at an arbitrary point in time.
1488    ///
1489    /// # Example
1490    /// ```rust
1491    /// # #[cfg(feature = "deploy")] {
1492    /// # use hydro_lang::prelude::*;
1493    /// # use futures::StreamExt;
1494    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1495    /// let tick = process.tick();
1496    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1497    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1498    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1499    /// # }, |mut stream| async move {
1500    /// // [ vec![1, 2, 3, 4] ]
1501    /// # for w in vec![vec![1, 2, 3, 4]] {
1502    /// #     assert_eq!(stream.next().await.unwrap(), w);
1503    /// # }
1504    /// # }));
1505    /// # }
1506    /// ```
1507    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1508    where
1509        O: IsOrdered,
1510        R: IsExactlyOnce,
1511    {
1512        self.make_totally_ordered().make_exactly_once().fold(
1513            q!(|| vec![]),
1514            q!(|acc, v| {
1515                acc.push(v);
1516            }),
1517        )
1518    }
1519
1520    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1521    /// and emitting each intermediate result.
1522    ///
1523    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1524    /// containing all intermediate accumulated values. The scan operation can also terminate early
1525    /// by returning `None`.
1526    ///
1527    /// The function takes a mutable reference to the accumulator and the current element, and returns
1528    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1529    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1530    ///
1531    /// # Examples
1532    ///
1533    /// Basic usage - running sum:
1534    /// ```rust
1535    /// # #[cfg(feature = "deploy")] {
1536    /// # use hydro_lang::prelude::*;
1537    /// # use futures::StreamExt;
1538    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1539    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1540    ///     q!(|| 0),
1541    ///     q!(|acc, x| {
1542    ///         *acc += x;
1543    ///         Some(*acc)
1544    ///     }),
1545    /// )
1546    /// # }, |mut stream| async move {
1547    /// // Output: 1, 3, 6, 10
1548    /// # for w in vec![1, 3, 6, 10] {
1549    /// #     assert_eq!(stream.next().await.unwrap(), w);
1550    /// # }
1551    /// # }));
1552    /// # }
1553    /// ```
1554    ///
1555    /// Early termination example:
1556    /// ```rust
1557    /// # #[cfg(feature = "deploy")] {
1558    /// # use hydro_lang::prelude::*;
1559    /// # use futures::StreamExt;
1560    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1561    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1562    ///     q!(|| 1),
1563    ///     q!(|state, x| {
1564    ///         *state = *state * x;
1565    ///         if *state > 6 {
1566    ///             None // Terminate the stream
1567    ///         } else {
1568    ///             Some(-*state)
1569    ///         }
1570    ///     }),
1571    /// )
1572    /// # }, |mut stream| async move {
1573    /// // Output: -1, -2, -6
1574    /// # for w in vec![-1, -2, -6] {
1575    /// #     assert_eq!(stream.next().await.unwrap(), w);
1576    /// # }
1577    /// # }));
1578    /// # }
1579    /// ```
1580    pub fn scan<A, U, I, F>(
1581        self,
1582        init: impl IntoQuotedMut<'a, I, L>,
1583        f: impl IntoQuotedMut<'a, F, L>,
1584    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1585    where
1586        O: IsOrdered,
1587        R: IsExactlyOnce,
1588        I: Fn() -> A + 'a,
1589        F: Fn(&mut A, T) -> Option<U> + 'a,
1590    {
1591        let init = init.splice_fn0_ctx(&self.location).into();
1592        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1593
1594        Stream::new(
1595            self.location.clone(),
1596            HydroNode::Scan {
1597                init,
1598                acc: f,
1599                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1600                metadata: self.location.new_node_metadata(
1601                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1602                ),
1603            },
1604        )
1605    }
1606
1607    /// Iteratively processes the elements of the stream using a state machine that can yield
1608    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1609    /// syntax in Rust, without requiring special syntax.
1610    ///
1611    /// Like [`Stream::scan`], this function takes in an initializer that emits the initial
1612    /// state. The second argument defines the processing logic, taking in a mutable reference
1613    /// to the state and the value to be processed. It emits a [`Generate`] value, whose
1614    /// variants define what is emitted and whether further inputs should be processed.
1615    ///
1616    /// # Example
1617    /// ```rust
1618    /// # #[cfg(feature = "deploy")] {
1619    /// # use hydro_lang::prelude::*;
1620    /// # use futures::StreamExt;
1621    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1622    /// process.source_iter(q!(vec![1, 3, 100, 10])).generator(
1623    ///     q!(|| 0),
1624    ///     q!(|acc, x| {
1625    ///         *acc += x;
1626    ///         if *acc > 100 {
1627    ///             hydro_lang::live_collections::keyed_stream::Generate::Return("done!".to_owned())
1628    ///         } else if *acc % 2 == 0 {
1629    ///             hydro_lang::live_collections::keyed_stream::Generate::Yield("even".to_owned())
1630    ///         } else {
1631    ///             hydro_lang::live_collections::keyed_stream::Generate::Continue
1632    ///         }
1633    ///     }),
1634    /// )
1635    /// # }, |mut stream| async move {
1636    /// // Output: "even", "done!"
1637    /// # let mut results = Vec::new();
1638    /// # for _ in 0..2 {
1639    /// #     results.push(stream.next().await.unwrap());
1640    /// # }
1641    /// # results.sort();
1642    /// # assert_eq!(results, vec!["done!".to_owned(), "even".to_owned()]);
1643    /// # }));
1644    /// # }
1645    /// ```
1646    pub fn generator<A, U, I, F>(
1647        self,
1648        init: impl IntoQuotedMut<'a, I, L> + Copy,
1649        f: impl IntoQuotedMut<'a, F, L> + Copy,
1650    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1651    where
1652        O: IsOrdered,
1653        R: IsExactlyOnce,
1654        I: Fn() -> A + 'a,
1655        F: Fn(&mut A, T) -> Generate<U> + 'a,
1656    {
1657        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1658        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1659
1660        let this = self.make_totally_ordered().make_exactly_once();
1661
1662        // State is Option<Option<A>>:
1663        //   None = not yet initialized
1664        //   Some(Some(a)) = active with state a
1665        //   Some(None) = terminated
1666        let scan_init = q!(|| None)
1667            .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1668            .into();
1669        let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1670            if state.is_none() {
1671                *state = Some(Some(init()));
1672            }
1673            match state {
1674                Some(Some(state_value)) => match f(state_value, v) {
1675                    Generate::Yield(out) => Some(Some(out)),
1676                    Generate::Return(out) => {
1677                        *state = Some(None);
1678                        Some(Some(out))
1679                    }
1680                    // Unlike KeyedStream, we can terminate the scan directly on
1681                    // Break/Return because there is only one state (no other keys
1682                    // that still need processing).
1683                    Generate::Break => None,
1684                    Generate::Continue => Some(None),
1685                },
1686                // State is Some(None) after Return; terminate the scan.
1687                _ => None,
1688            }
1689        })
1690        .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1691        .into();
1692
1693        let scan_node = HydroNode::Scan {
1694            init: scan_init,
1695            acc: scan_f,
1696            input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1697            metadata: this.location.new_node_metadata(Stream::<
1698                Option<U>,
1699                L,
1700                B,
1701                TotalOrder,
1702                ExactlyOnce,
1703            >::collection_kind()),
1704        };
1705
1706        let flatten_f = q!(|d| d)
1707            .splice_fn1_ctx::<Option<U>, _>(&this.location)
1708            .into();
1709        let flatten_node = HydroNode::FlatMap {
1710            f: flatten_f,
1711            input: Box::new(scan_node),
1712            metadata: this
1713                .location
1714                .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1715        };
1716
1717        Stream::new(this.location.clone(), flatten_node)
1718    }
1719
1720    /// Given a time interval, returns a stream corresponding to samples taken from the
1721    /// stream roughly at that interval. The output will have elements in the same order
1722    /// as the input, but with arbitrary elements skipped between samples. There is also
1723    /// no guarantee on the exact timing of the samples.
1724    ///
1725    /// # Non-Determinism
1726    /// The output stream is non-deterministic in which elements are sampled, since this
1727    /// is controlled by a clock.
1728    pub fn sample_every(
1729        self,
1730        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1731        nondet: NonDet,
1732    ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
1733    where
1734        L: NoTick + NoAtomic,
1735    {
1736        let samples = self.location.source_interval(interval, nondet);
1737
1738        let tick = self.location.tick();
1739        self.batch(&tick, nondet)
1740            .filter_if(samples.batch(&tick, nondet).first().is_some())
1741            .all_ticks()
1742            .weaken_retries()
1743    }
1744
1745    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
1746    /// stream has not emitted a value since that duration.
1747    ///
1748    /// # Non-Determinism
1749    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1750    /// samples take place, timeouts may be non-deterministically generated or missed,
1751    /// and the notification of the timeout may be delayed as well. There is also no
1752    /// guarantee on how long the [`Optional`] will have a value after the timeout is
1753    /// detected based on when the next sample is taken.
1754    pub fn timeout(
1755        self,
1756        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
1757        nondet: NonDet,
1758    ) -> Optional<(), L, Unbounded>
1759    where
1760        L: NoTick + NoAtomic,
1761    {
1762        let tick = self.location.tick();
1763
1764        let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
1765            q!(|| None),
1766            q!(
1767                |latest, _| {
1768                    *latest = Some(Instant::now());
1769                },
1770                commutative = manual_proof!(/** TODO */)
1771            ),
1772        );
1773
1774        latest_received
1775            .snapshot(&tick, nondet)
1776            .filter_map(q!(move |latest_received| {
1777                if let Some(latest_received) = latest_received {
1778                    if Instant::now().duration_since(latest_received) > duration {
1779                        Some(())
1780                    } else {
1781                        None
1782                    }
1783                } else {
1784                    Some(())
1785                }
1786            }))
1787            .latest()
1788    }
1789
1790    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
1791    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
1792    ///
1793    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1794    /// processed before an acknowledgement is emitted.
1795    pub fn atomic(self) -> Stream<T, Atomic<L>, B, O, R> {
1796        let id = self.location.flow_state().borrow_mut().next_clock_id();
1797        let out_location = Atomic {
1798            tick: Tick {
1799                id,
1800                l: self.location.clone(),
1801            },
1802        };
1803        Stream::new(
1804            out_location.clone(),
1805            HydroNode::BeginAtomic {
1806                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1807                metadata: out_location
1808                    .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
1809            },
1810        )
1811    }
1812
1813    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1814    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1815    /// the order of the input. The output stream will execute in the [`Tick`] that was
1816    /// used to create the atomic section.
1817    ///
1818    /// # Non-Determinism
1819    /// The batch boundaries are non-deterministic and may change across executions.
1820    pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
1821        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1822        Stream::new(
1823            tick.clone(),
1824            HydroNode::Batch {
1825                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1826                metadata: tick
1827                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
1828            },
1829        )
1830    }
1831
1832    /// An operator which allows you to "name" a `HydroNode`.
1833    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1834    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
1835        {
1836            let mut node = self.ir_node.borrow_mut();
1837            let metadata = node.metadata_mut();
1838            metadata.tag = Some(name.to_owned());
1839        }
1840        self
1841    }
1842
1843    /// Explicitly "casts" the stream to a type with a different ordering
1844    /// guarantee. Useful in unsafe code where the ordering cannot be proven
1845    /// by the type-system.
1846    ///
1847    /// # Non-Determinism
1848    /// This function is used as an escape hatch, and any mistakes in the
1849    /// provided ordering guarantee will propagate into the guarantees
1850    /// for the rest of the program.
1851    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
1852        if O::ORDERING_KIND == O2::ORDERING_KIND {
1853            Stream::new(
1854                self.location.clone(),
1855                self.ir_node.replace(HydroNode::Placeholder),
1856            )
1857        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1858            // We can always weaken the ordering guarantee
1859            Stream::new(
1860                self.location.clone(),
1861                HydroNode::Cast {
1862                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1863                    metadata: self
1864                        .location
1865                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1866                },
1867            )
1868        } else {
1869            Stream::new(
1870                self.location.clone(),
1871                HydroNode::ObserveNonDet {
1872                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1873                    trusted: false,
1874                    metadata: self
1875                        .location
1876                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1877                },
1878            )
1879        }
1880    }
1881
1882    // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
1883    // intermediate states will not be revealed
1884    fn assume_ordering_trusted_bounded<O2: Ordering>(
1885        self,
1886        nondet: NonDet,
1887    ) -> Stream<T, L, B, O2, R> {
1888        if B::BOUNDED {
1889            self.assume_ordering_trusted(nondet)
1890        } else {
1891            self.assume_ordering(nondet)
1892        }
1893    }
1894
1895    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1896    // is not observable
1897    pub(crate) fn assume_ordering_trusted<O2: Ordering>(
1898        self,
1899        _nondet: NonDet,
1900    ) -> Stream<T, L, B, O2, R> {
1901        if O::ORDERING_KIND == O2::ORDERING_KIND {
1902            Stream::new(
1903                self.location.clone(),
1904                self.ir_node.replace(HydroNode::Placeholder),
1905            )
1906        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1907            // We can always weaken the ordering guarantee
1908            Stream::new(
1909                self.location.clone(),
1910                HydroNode::Cast {
1911                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1912                    metadata: self
1913                        .location
1914                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1915                },
1916            )
1917        } else {
1918            Stream::new(
1919                self.location.clone(),
1920                HydroNode::ObserveNonDet {
1921                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1922                    trusted: true,
1923                    metadata: self
1924                        .location
1925                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1926                },
1927            )
1928        }
1929    }
1930
1931    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
1932    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
1933    /// which is always safe because that is the weakest possible guarantee.
1934    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
1935        self.weaken_ordering::<NoOrder>()
1936    }
1937
1938    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
1939    /// enforcing that `O2` is weaker than the input ordering guarantee.
1940    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
1941        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
1942        self.assume_ordering::<O2>(nondet)
1943    }
1944
1945    /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
1946    /// implies that `O == TotalOrder`.
1947    pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
1948    where
1949        O: IsOrdered,
1950    {
1951        self.assume_ordering(nondet!(/** no-op */))
1952    }
1953
1954    /// Explicitly "casts" the stream to a type with a different retries
1955    /// guarantee. Useful in unsafe code where the lack of retries cannot
1956    /// be proven by the type-system.
1957    ///
1958    /// # Non-Determinism
1959    /// This function is used as an escape hatch, and any mistakes in the
1960    /// provided retries guarantee will propagate into the guarantees
1961    /// for the rest of the program.
1962    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1963        if R::RETRIES_KIND == R2::RETRIES_KIND {
1964            Stream::new(
1965                self.location.clone(),
1966                self.ir_node.replace(HydroNode::Placeholder),
1967            )
1968        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1969            // We can always weaken the retries guarantee
1970            Stream::new(
1971                self.location.clone(),
1972                HydroNode::Cast {
1973                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1974                    metadata: self
1975                        .location
1976                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1977                },
1978            )
1979        } else {
1980            Stream::new(
1981                self.location.clone(),
1982                HydroNode::ObserveNonDet {
1983                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1984                    trusted: false,
1985                    metadata: self
1986                        .location
1987                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1988                },
1989            )
1990        }
1991    }
1992
1993    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1994    // is not observable
1995    fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1996        if R::RETRIES_KIND == R2::RETRIES_KIND {
1997            Stream::new(
1998                self.location.clone(),
1999                self.ir_node.replace(HydroNode::Placeholder),
2000            )
2001        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2002            // We can always weaken the retries guarantee
2003            Stream::new(
2004                self.location.clone(),
2005                HydroNode::Cast {
2006                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2007                    metadata: self
2008                        .location
2009                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2010                },
2011            )
2012        } else {
2013            Stream::new(
2014                self.location.clone(),
2015                HydroNode::ObserveNonDet {
2016                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2017                    trusted: true,
2018                    metadata: self
2019                        .location
2020                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2021                },
2022            )
2023        }
2024    }
2025
2026    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
2027    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
2028    /// which is always safe because that is the weakest possible guarantee.
2029    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
2030        self.weaken_retries::<AtLeastOnce>()
2031    }
2032
2033    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
2034    /// enforcing that `R2` is weaker than the input retries guarantee.
2035    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
2036        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
2037        self.assume_retries::<R2>(nondet)
2038    }
2039
2040    /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
2041    /// implies that `R == ExactlyOnce`.
2042    pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
2043    where
2044        R: IsExactlyOnce,
2045    {
2046        self.assume_retries(nondet!(/** no-op */))
2047    }
2048
2049    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
2050    /// implies that `B == Bounded`.
2051    pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
2052    where
2053        B: IsBounded,
2054    {
2055        Stream::new(
2056            self.location.clone(),
2057            self.ir_node.replace(HydroNode::Placeholder),
2058        )
2059    }
2060}
2061
2062impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
2063where
2064    L: Location<'a>,
2065{
2066    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
2067    ///
2068    /// # Example
2069    /// ```rust
2070    /// # #[cfg(feature = "deploy")] {
2071    /// # use hydro_lang::prelude::*;
2072    /// # use futures::StreamExt;
2073    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2074    /// process.source_iter(q!(&[1, 2, 3])).cloned()
2075    /// # }, |mut stream| async move {
2076    /// // 1, 2, 3
2077    /// # for w in vec![1, 2, 3] {
2078    /// #     assert_eq!(stream.next().await.unwrap(), w);
2079    /// # }
2080    /// # }));
2081    /// # }
2082    /// ```
2083    pub fn cloned(self) -> Stream<T, L, B, O, R>
2084    where
2085        T: Clone,
2086    {
2087        self.map(q!(|d| d.clone()))
2088    }
2089}
2090
2091impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2092where
2093    L: Location<'a>,
2094{
2095    /// Computes the number of elements in the stream as a [`Singleton`].
2096    ///
2097    /// # Example
2098    /// ```rust
2099    /// # #[cfg(feature = "deploy")] {
2100    /// # use hydro_lang::prelude::*;
2101    /// # use futures::StreamExt;
2102    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2103    /// let tick = process.tick();
2104    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2105    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2106    /// batch.count().all_ticks()
2107    /// # }, |mut stream| async move {
2108    /// // 4
2109    /// # assert_eq!(stream.next().await.unwrap(), 4);
2110    /// # }));
2111    /// # }
2112    /// ```
2113    pub fn count(self) -> Singleton<usize, L, B::StreamToMonotone> {
2114        self.assume_ordering_trusted::<TotalOrder>(nondet!(
2115            /// Order does not affect eventual count, and also does not affect intermediate states.
2116        ))
2117        .fold(
2118            q!(|| 0usize),
2119            q!(
2120                |count, _| *count += 1,
2121                monotone = manual_proof!(/** += 1 is monotone */)
2122            ),
2123        )
2124    }
2125}
2126
2127impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2128    /// Produces a new stream that merges the elements of the two input streams.
2129    /// The result has [`NoOrder`] because the order of merging is not guaranteed.
2130    ///
2131    /// Currently, both input streams must be [`Unbounded`]. When the streams are
2132    /// [`Bounded`], you can use [`Stream::chain`] instead.
2133    ///
2134    /// # Example
2135    /// ```rust
2136    /// # #[cfg(feature = "deploy")] {
2137    /// # use hydro_lang::prelude::*;
2138    /// # use futures::StreamExt;
2139    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2140    /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
2141    /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
2142    /// numbers.clone().map(q!(|x| x + 1)).merge_unordered(numbers)
2143    /// # }, |mut stream| async move {
2144    /// // 2, 3, 4, 5, and 1, 2, 3, 4 merged in unknown order
2145    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2146    /// #     assert_eq!(stream.next().await.unwrap(), w);
2147    /// # }
2148    /// # }));
2149    /// # }
2150    /// ```
2151    pub fn merge_unordered<O2: Ordering, R2: Retries>(
2152        self,
2153        other: Stream<T, L, Unbounded, O2, R2>,
2154    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2155    where
2156        R: MinRetries<R2>,
2157    {
2158        Stream::new(
2159            self.location.clone(),
2160            HydroNode::Chain {
2161                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2162                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2163                metadata: self.location.new_node_metadata(Stream::<
2164                    T,
2165                    L,
2166                    Unbounded,
2167                    NoOrder,
2168                    <R as MinRetries<R2>>::Min,
2169                >::collection_kind()),
2170            },
2171        )
2172    }
2173
2174    /// Deprecated: use [`Stream::merge_unordered`] instead.
2175    #[deprecated(note = "use `merge_unordered` instead")]
2176    pub fn interleave<O2: Ordering, R2: Retries>(
2177        self,
2178        other: Stream<T, L, Unbounded, O2, R2>,
2179    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2180    where
2181        R: MinRetries<R2>,
2182    {
2183        self.merge_unordered(other)
2184    }
2185}
2186
2187impl<'a, T, L: Location<'a> + NoTick, R: Retries> Stream<T, L, Unbounded, TotalOrder, R> {
2188    /// Produces a new stream that combines the elements of the two input streams,
2189    /// preserving the relative order of elements within each input.
2190    ///
2191    /// Currently, both input streams must be [`Unbounded`]. When the streams are
2192    /// [`Bounded`], you can use [`Stream::chain`] instead.
2193    ///
2194    /// # Non-Determinism
2195    /// The order in which elements *across* the two streams will be interleaved is
2196    /// non-deterministic, so the order of elements will vary across runs. If the output order
2197    /// is irrelevant, use [`Stream::merge_unordered`] instead, which is deterministic but emits an
2198    /// unordered stream.
2199    ///
2200    /// # Example
2201    /// ```rust
2202    /// # #[cfg(feature = "deploy")] {
2203    /// # use hydro_lang::prelude::*;
2204    /// # use futures::StreamExt;
2205    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2206    /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
2207    /// # process.source_iter(q!(vec![1, 3])).into();
2208    /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
2209    /// # }, |mut stream| async move {
2210    /// // 1, 3 and 2, 4 in some order, preserving the original local order
2211    /// # for w in vec![1, 3, 2, 4] {
2212    /// #     assert_eq!(stream.next().await.unwrap(), w);
2213    /// # }
2214    /// # }));
2215    /// # }
2216    /// ```
2217    pub fn merge_ordered<R2: Retries>(
2218        self,
2219        other: Stream<T, L, Unbounded, TotalOrder, R2>,
2220        _nondet: NonDet,
2221    ) -> Stream<T, L, Unbounded, TotalOrder, <R as MinRetries<R2>>::Min>
2222    where
2223        R: MinRetries<R2>,
2224    {
2225        Stream::new(
2226            self.location.clone(),
2227            HydroNode::Chain {
2228                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2229                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2230                metadata: self.location.new_node_metadata(Stream::<
2231                    T,
2232                    L,
2233                    Unbounded,
2234                    TotalOrder,
2235                    <R as MinRetries<R2>>::Min,
2236                >::collection_kind()),
2237            },
2238        )
2239    }
2240}
2241
2242impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2243where
2244    L: Location<'a>,
2245{
2246    /// Produces a new stream that emits the input elements in sorted order.
2247    ///
2248    /// The input stream can have any ordering guarantee, but the output stream
2249    /// will have a [`TotalOrder`] guarantee. This operator will block until all
2250    /// elements in the input stream are available, so it requires the input stream
2251    /// to be [`Bounded`].
2252    ///
2253    /// # Example
2254    /// ```rust
2255    /// # #[cfg(feature = "deploy")] {
2256    /// # use hydro_lang::prelude::*;
2257    /// # use futures::StreamExt;
2258    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2259    /// let tick = process.tick();
2260    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
2261    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2262    /// batch.sort().all_ticks()
2263    /// # }, |mut stream| async move {
2264    /// // 1, 2, 3, 4
2265    /// # for w in (1..5) {
2266    /// #     assert_eq!(stream.next().await.unwrap(), w);
2267    /// # }
2268    /// # }));
2269    /// # }
2270    /// ```
2271    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2272    where
2273        B: IsBounded,
2274        T: Ord,
2275    {
2276        let this = self.make_bounded();
2277        Stream::new(
2278            this.location.clone(),
2279            HydroNode::Sort {
2280                input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2281                metadata: this
2282                    .location
2283                    .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2284            },
2285        )
2286    }
2287
2288    /// Produces a new stream that first emits the elements of the `self` stream,
2289    /// and then emits the elements of the `other` stream. The output stream has
2290    /// a [`TotalOrder`] guarantee if and only if both input streams have a
2291    /// [`TotalOrder`] guarantee.
2292    ///
2293    /// Currently, both input streams must be [`Bounded`]. This operator will block
2294    /// on the first stream until all its elements are available. In a future version,
2295    /// we will relax the requirement on the `other` stream.
2296    ///
2297    /// # Example
2298    /// ```rust
2299    /// # #[cfg(feature = "deploy")] {
2300    /// # use hydro_lang::prelude::*;
2301    /// # use futures::StreamExt;
2302    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2303    /// let tick = process.tick();
2304    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2305    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2306    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2307    /// # }, |mut stream| async move {
2308    /// // 2, 3, 4, 5, 1, 2, 3, 4
2309    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2310    /// #     assert_eq!(stream.next().await.unwrap(), w);
2311    /// # }
2312    /// # }));
2313    /// # }
2314    /// ```
2315    pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2316        self,
2317        other: Stream<T, L, B2, O2, R2>,
2318    ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2319    where
2320        B: IsBounded,
2321        O: MinOrder<O2>,
2322        R: MinRetries<R2>,
2323    {
2324        check_matching_location(&self.location, &other.location);
2325
2326        Stream::new(
2327            self.location.clone(),
2328            HydroNode::Chain {
2329                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2330                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2331                metadata: self.location.new_node_metadata(Stream::<
2332                    T,
2333                    L,
2334                    B2,
2335                    <O as MinOrder<O2>>::Min,
2336                    <R as MinRetries<R2>>::Min,
2337                >::collection_kind()),
2338            },
2339        )
2340    }
2341
2342    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
2343    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
2344    /// because this is compiled into a nested loop.
2345    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
2346        self,
2347        other: Stream<T2, L, Bounded, O2, R>,
2348    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
2349    where
2350        B: IsBounded,
2351        T: Clone,
2352        T2: Clone,
2353    {
2354        let this = self.make_bounded();
2355        check_matching_location(&this.location, &other.location);
2356
2357        Stream::new(
2358            this.location.clone(),
2359            HydroNode::CrossProduct {
2360                left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2361                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2362                metadata: this.location.new_node_metadata(Stream::<
2363                    (T, T2),
2364                    L,
2365                    Bounded,
2366                    <O2 as MinOrder<O>>::Min,
2367                    R,
2368                >::collection_kind()),
2369            },
2370        )
2371    }
2372
2373    /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2374    /// `self` used as the values for *each* key.
2375    ///
2376    /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2377    /// values. For example, it can be used to send the same set of elements to several cluster
2378    /// members, if the membership information is available as a [`KeyedSingleton`].
2379    ///
2380    /// # Example
2381    /// ```rust
2382    /// # #[cfg(feature = "deploy")] {
2383    /// # use hydro_lang::prelude::*;
2384    /// # use futures::StreamExt;
2385    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2386    /// # let tick = process.tick();
2387    /// let keyed_singleton = // { 1: (), 2: () }
2388    /// # process
2389    /// #     .source_iter(q!(vec![(1, ()), (2, ())]))
2390    /// #     .into_keyed()
2391    /// #     .batch(&tick, nondet!(/** test */))
2392    /// #     .first();
2393    /// let stream = // [ "a", "b" ]
2394    /// # process
2395    /// #     .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2396    /// #     .batch(&tick, nondet!(/** test */));
2397    /// stream.repeat_with_keys(keyed_singleton)
2398    /// # .entries().all_ticks()
2399    /// # }, |mut stream| async move {
2400    /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2401    /// # let mut results = Vec::new();
2402    /// # for _ in 0..4 {
2403    /// #     results.push(stream.next().await.unwrap());
2404    /// # }
2405    /// # results.sort();
2406    /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2407    /// # }));
2408    /// # }
2409    /// ```
2410    pub fn repeat_with_keys<K, V2>(
2411        self,
2412        keys: KeyedSingleton<K, V2, L, Bounded>,
2413    ) -> KeyedStream<K, T, L, Bounded, O, R>
2414    where
2415        B: IsBounded,
2416        K: Clone,
2417        T: Clone,
2418    {
2419        keys.keys()
2420            .weaken_retries()
2421            .assume_ordering_trusted::<TotalOrder>(
2422                nondet!(/** keyed stream does not depend on ordering of keys */),
2423            )
2424            .cross_product_nested_loop(self.make_bounded())
2425            .into_keyed()
2426    }
2427
2428    /// Consumes a stream of `Future<T>`, resolving each future while blocking subgraph
2429    /// execution until all results are available. The output order is based on when futures
2430    /// complete, and may be different than the input order.
2431    ///
2432    /// Unlike [`Stream::resolve_futures`], which allows the subgraph to continue executing
2433    /// while futures are pending, this variant blocks until the futures resolve.
2434    ///
2435    /// # Example
2436    /// ```rust
2437    /// # #[cfg(feature = "deploy")] {
2438    /// # use std::collections::HashSet;
2439    /// # use futures::StreamExt;
2440    /// # use hydro_lang::prelude::*;
2441    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2442    /// process
2443    ///     .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2444    ///     .map(q!(|x| async move {
2445    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2446    ///         x
2447    ///     }))
2448    ///     .resolve_futures_blocking()
2449    /// #   },
2450    /// #   |mut stream| async move {
2451    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2452    /// #       let mut output = HashSet::new();
2453    /// #       for _ in 1..10 {
2454    /// #           output.insert(stream.next().await.unwrap());
2455    /// #       }
2456    /// #       assert_eq!(
2457    /// #           output,
2458    /// #           HashSet::<i32>::from_iter(1..10)
2459    /// #       );
2460    /// #   },
2461    /// # ));
2462    /// # }
2463    /// ```
2464    pub fn resolve_futures_blocking(self) -> Stream<T::Output, L, B, NoOrder, R>
2465    where
2466        T: Future,
2467    {
2468        Stream::new(
2469            self.location.clone(),
2470            HydroNode::ResolveFuturesBlocking {
2471                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2472                metadata: self
2473                    .location
2474                    .new_node_metadata(Stream::<T::Output, L, B, NoOrder, R>::collection_kind()),
2475            },
2476        )
2477    }
2478
2479    /// Returns a [`Singleton`] containing `true` if the stream has no elements, or `false` otherwise.
2480    ///
2481    /// # Example
2482    /// ```rust
2483    /// # #[cfg(feature = "deploy")] {
2484    /// # use hydro_lang::prelude::*;
2485    /// # use futures::StreamExt;
2486    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2487    /// let tick = process.tick();
2488    /// let empty: Stream<i32, _, Bounded> = process
2489    ///   .source_iter(q!(Vec::<i32>::new()))
2490    ///   .batch(&tick, nondet!(/** test */));
2491    /// empty.is_empty().all_ticks()
2492    /// # }, |mut stream| async move {
2493    /// // true
2494    /// # assert_eq!(stream.next().await.unwrap(), true);
2495    /// # }));
2496    /// # }
2497    /// ```
2498    #[expect(clippy::wrong_self_convention, reason = "stream function naming")]
2499    pub fn is_empty(self) -> Singleton<bool, L, Bounded>
2500    where
2501        B: IsBounded,
2502    {
2503        self.make_bounded()
2504            .assume_ordering_trusted::<TotalOrder>(
2505                nondet!(/** is_empty intermediates unaffected by order */),
2506            )
2507            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** is_empty is idempotent */))
2508            .fold(q!(|| true), q!(|empty, _| { *empty = false },))
2509    }
2510}
2511
2512impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2513where
2514    L: Location<'a>,
2515{
2516    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2517    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2518    /// by equi-joining the two streams on the key attribute `K`.
2519    ///
2520    /// # Example
2521    /// ```rust
2522    /// # #[cfg(feature = "deploy")] {
2523    /// # use hydro_lang::prelude::*;
2524    /// # use std::collections::HashSet;
2525    /// # use futures::StreamExt;
2526    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2527    /// let tick = process.tick();
2528    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2529    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2530    /// stream1.join(stream2)
2531    /// # }, |mut stream| async move {
2532    /// // (1, ('a', 'x')), (2, ('b', 'y'))
2533    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2534    /// # stream.map(|i| assert!(expected.contains(&i)));
2535    /// # }));
2536    /// # }
2537    pub fn join<V2, O2: Ordering, R2: Retries>(
2538        self,
2539        n: Stream<(K, V2), L, B, O2, R2>,
2540    ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
2541    where
2542        K: Eq + Hash,
2543        R: MinRetries<R2>,
2544    {
2545        check_matching_location(&self.location, &n.location);
2546
2547        Stream::new(
2548            self.location.clone(),
2549            HydroNode::Join {
2550                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2551                right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2552                metadata: self.location.new_node_metadata(Stream::<
2553                    (K, (V1, V2)),
2554                    L,
2555                    B,
2556                    NoOrder,
2557                    <R as MinRetries<R2>>::Min,
2558                >::collection_kind()),
2559            },
2560        )
2561    }
2562
2563    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2564    /// computes the anti-join of the items in the input -- i.e. returns
2565    /// unique items in the first input that do not have a matching key
2566    /// in the second input.
2567    ///
2568    /// # Example
2569    /// ```rust
2570    /// # #[cfg(feature = "deploy")] {
2571    /// # use hydro_lang::prelude::*;
2572    /// # use futures::StreamExt;
2573    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2574    /// let tick = process.tick();
2575    /// let stream = process
2576    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2577    ///   .batch(&tick, nondet!(/** test */));
2578    /// let batch = process
2579    ///   .source_iter(q!(vec![1, 2]))
2580    ///   .batch(&tick, nondet!(/** test */));
2581    /// stream.anti_join(batch).all_ticks()
2582    /// # }, |mut stream| async move {
2583    /// # for w in vec![(3, 'c'), (4, 'd')] {
2584    /// #     assert_eq!(stream.next().await.unwrap(), w);
2585    /// # }
2586    /// # }));
2587    /// # }
2588    pub fn anti_join<O2: Ordering, R2: Retries>(
2589        self,
2590        n: Stream<K, L, Bounded, O2, R2>,
2591    ) -> Stream<(K, V1), L, B, O, R>
2592    where
2593        K: Eq + Hash,
2594    {
2595        check_matching_location(&self.location, &n.location);
2596
2597        Stream::new(
2598            self.location.clone(),
2599            HydroNode::AntiJoin {
2600                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2601                neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2602                metadata: self
2603                    .location
2604                    .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2605            },
2606        )
2607    }
2608}
2609
2610impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2611    Stream<(K, V), L, B, O, R>
2612{
2613    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2614    /// is used as the key and the second element is added to the entries associated with that key.
2615    ///
2616    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2617    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2618    /// performing grouped aggregations, but also for more precise ordering guarantees such as
2619    /// total ordering _within_ each group but no ordering _across_ groups.
2620    ///
2621    /// # Example
2622    /// ```rust
2623    /// # #[cfg(feature = "deploy")] {
2624    /// # use hydro_lang::prelude::*;
2625    /// # use futures::StreamExt;
2626    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2627    /// process
2628    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2629    ///     .into_keyed()
2630    /// #   .entries()
2631    /// # }, |mut stream| async move {
2632    /// // { 1: [2, 3], 2: [4] }
2633    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2634    /// #     assert_eq!(stream.next().await.unwrap(), w);
2635    /// # }
2636    /// # }));
2637    /// # }
2638    /// ```
2639    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2640        KeyedStream::new(
2641            self.location.clone(),
2642            HydroNode::Cast {
2643                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2644                metadata: self
2645                    .location
2646                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2647            },
2648        )
2649    }
2650}
2651
2652impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2653where
2654    K: Eq + Hash,
2655    L: Location<'a>,
2656{
2657    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2658    /// # Example
2659    /// ```rust
2660    /// # #[cfg(feature = "deploy")] {
2661    /// # use hydro_lang::prelude::*;
2662    /// # use futures::StreamExt;
2663    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2664    /// let tick = process.tick();
2665    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2666    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2667    /// batch.keys().all_ticks()
2668    /// # }, |mut stream| async move {
2669    /// // 1, 2
2670    /// # assert_eq!(stream.next().await.unwrap(), 1);
2671    /// # assert_eq!(stream.next().await.unwrap(), 2);
2672    /// # }));
2673    /// # }
2674    /// ```
2675    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2676        self.into_keyed()
2677            .fold(
2678                q!(|| ()),
2679                q!(
2680                    |_, _| {},
2681                    commutative = manual_proof!(/** values are ignored */),
2682                    idempotent = manual_proof!(/** values are ignored */)
2683                ),
2684            )
2685            .keys()
2686    }
2687}
2688
2689impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2690where
2691    L: Location<'a> + NoTick,
2692{
2693    /// Returns a stream corresponding to the latest batch of elements being atomically
2694    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2695    /// the order of the input.
2696    ///
2697    /// # Non-Determinism
2698    /// The batch boundaries are non-deterministic and may change across executions.
2699    pub fn batch_atomic(
2700        self,
2701        tick: &Tick<L>,
2702        _nondet: NonDet,
2703    ) -> Stream<T, Tick<L>, Bounded, O, R> {
2704        Stream::new(
2705            tick.clone(),
2706            HydroNode::Batch {
2707                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2708                metadata: tick
2709                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2710            },
2711        )
2712    }
2713
2714    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2715    /// See [`Stream::atomic`] for more details.
2716    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2717        Stream::new(
2718            self.location.tick.l.clone(),
2719            HydroNode::EndAtomic {
2720                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2721                metadata: self
2722                    .location
2723                    .tick
2724                    .l
2725                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2726            },
2727        )
2728    }
2729}
2730
2731impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2732where
2733    L: Location<'a> + NoTick + NoAtomic,
2734    F: Future<Output = T>,
2735{
2736    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2737    /// Future outputs are produced as available, regardless of input arrival order.
2738    ///
2739    /// # Example
2740    /// ```rust
2741    /// # #[cfg(feature = "deploy")] {
2742    /// # use std::collections::HashSet;
2743    /// # use futures::StreamExt;
2744    /// # use hydro_lang::prelude::*;
2745    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2746    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2747    ///     .map(q!(|x| async move {
2748    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2749    ///         x
2750    ///     }))
2751    ///     .resolve_futures()
2752    /// #   },
2753    /// #   |mut stream| async move {
2754    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2755    /// #       let mut output = HashSet::new();
2756    /// #       for _ in 1..10 {
2757    /// #           output.insert(stream.next().await.unwrap());
2758    /// #       }
2759    /// #       assert_eq!(
2760    /// #           output,
2761    /// #           HashSet::<i32>::from_iter(1..10)
2762    /// #       );
2763    /// #   },
2764    /// # ));
2765    /// # }
2766    pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
2767        Stream::new(
2768            self.location.clone(),
2769            HydroNode::ResolveFutures {
2770                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2771                metadata: self
2772                    .location
2773                    .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
2774            },
2775        )
2776    }
2777
2778    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2779    /// Future outputs are produced in the same order as the input stream.
2780    ///
2781    /// # Example
2782    /// ```rust
2783    /// # #[cfg(feature = "deploy")] {
2784    /// # use std::collections::HashSet;
2785    /// # use futures::StreamExt;
2786    /// # use hydro_lang::prelude::*;
2787    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2788    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2789    ///     .map(q!(|x| async move {
2790    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2791    ///         x
2792    ///     }))
2793    ///     .resolve_futures_ordered()
2794    /// #   },
2795    /// #   |mut stream| async move {
2796    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2797    /// #       let mut output = Vec::new();
2798    /// #       for _ in 1..10 {
2799    /// #           output.push(stream.next().await.unwrap());
2800    /// #       }
2801    /// #       assert_eq!(
2802    /// #           output,
2803    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2804    /// #       );
2805    /// #   },
2806    /// # ));
2807    /// # }
2808    pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
2809        Stream::new(
2810            self.location.clone(),
2811            HydroNode::ResolveFuturesOrdered {
2812                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2813                metadata: self
2814                    .location
2815                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2816            },
2817        )
2818    }
2819}
2820
2821impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2822where
2823    L: Location<'a>,
2824{
2825    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2826    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2827    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2828        Stream::new(
2829            self.location.outer().clone(),
2830            HydroNode::YieldConcat {
2831                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2832                metadata: self
2833                    .location
2834                    .outer()
2835                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2836            },
2837        )
2838    }
2839
2840    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2841    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2842    ///
2843    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2844    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2845    /// stream's [`Tick`] context.
2846    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2847        let out_location = Atomic {
2848            tick: self.location.clone(),
2849        };
2850
2851        Stream::new(
2852            out_location.clone(),
2853            HydroNode::YieldConcat {
2854                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2855                metadata: out_location
2856                    .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2857            },
2858        )
2859    }
2860
2861    /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
2862    /// such as `fold` retrain their memory across ticks rather than resetting across batches of
2863    /// input.
2864    ///
2865    /// This API is particularly useful for stateful computation on batches of data, such as
2866    /// maintaining an accumulated state that is up to date with the current batch.
2867    ///
2868    /// # Example
2869    /// ```rust
2870    /// # #[cfg(feature = "deploy")] {
2871    /// # use hydro_lang::prelude::*;
2872    /// # use futures::StreamExt;
2873    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2874    /// let tick = process.tick();
2875    /// # // ticks are lazy by default, forces the second tick to run
2876    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2877    /// # let batch_first_tick = process
2878    /// #   .source_iter(q!(vec![1, 2, 3, 4]))
2879    /// #  .batch(&tick, nondet!(/** test */));
2880    /// # let batch_second_tick = process
2881    /// #   .source_iter(q!(vec![5, 6, 7]))
2882    /// #   .batch(&tick, nondet!(/** test */))
2883    /// #   .defer_tick(); // appears on the second tick
2884    /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
2885    /// # batch_first_tick.chain(batch_second_tick).all_ticks();
2886    ///
2887    /// input.batch(&tick, nondet!(/** test */))
2888    ///     .across_ticks(|s| s.count()).all_ticks()
2889    /// # }, |mut stream| async move {
2890    /// // [4, 7]
2891    /// assert_eq!(stream.next().await.unwrap(), 4);
2892    /// assert_eq!(stream.next().await.unwrap(), 7);
2893    /// # }));
2894    /// # }
2895    /// ```
2896    pub fn across_ticks<Out: BatchAtomic>(
2897        self,
2898        thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
2899    ) -> Out::Batched {
2900        thunk(self.all_ticks_atomic()).batched_atomic()
2901    }
2902
2903    /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2904    /// always has the elements of `self` at tick `T - 1`.
2905    ///
2906    /// At tick `0`, the output stream is empty, since there is no previous tick.
2907    ///
2908    /// This operator enables stateful iterative processing with ticks, by sending data from one
2909    /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2910    ///
2911    /// # Example
2912    /// ```rust
2913    /// # #[cfg(feature = "deploy")] {
2914    /// # use hydro_lang::prelude::*;
2915    /// # use futures::StreamExt;
2916    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2917    /// let tick = process.tick();
2918    /// // ticks are lazy by default, forces the second tick to run
2919    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2920    ///
2921    /// let batch_first_tick = process
2922    ///   .source_iter(q!(vec![1, 2, 3, 4]))
2923    ///   .batch(&tick, nondet!(/** test */));
2924    /// let batch_second_tick = process
2925    ///   .source_iter(q!(vec![0, 3, 4, 5, 6]))
2926    ///   .batch(&tick, nondet!(/** test */))
2927    ///   .defer_tick(); // appears on the second tick
2928    /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2929    ///
2930    /// changes_across_ticks.clone().filter_not_in(
2931    ///     changes_across_ticks.defer_tick() // the elements from the previous tick
2932    /// ).all_ticks()
2933    /// # }, |mut stream| async move {
2934    /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2935    /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2936    /// #     assert_eq!(stream.next().await.unwrap(), w);
2937    /// # }
2938    /// # }));
2939    /// # }
2940    /// ```
2941    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2942        Stream::new(
2943            self.location.clone(),
2944            HydroNode::DeferTick {
2945                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2946                metadata: self
2947                    .location
2948                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2949            },
2950        )
2951    }
2952}
2953
2954#[cfg(test)]
2955mod tests {
2956    #[cfg(feature = "deploy")]
2957    use futures::{SinkExt, StreamExt};
2958    #[cfg(feature = "deploy")]
2959    use hydro_deploy::Deployment;
2960    #[cfg(feature = "deploy")]
2961    use serde::{Deserialize, Serialize};
2962    #[cfg(any(feature = "deploy", feature = "sim"))]
2963    use stageleft::q;
2964
2965    #[cfg(any(feature = "deploy", feature = "sim"))]
2966    use crate::compile::builder::FlowBuilder;
2967    #[cfg(feature = "deploy")]
2968    use crate::live_collections::sliced::sliced;
2969    #[cfg(feature = "deploy")]
2970    use crate::live_collections::stream::ExactlyOnce;
2971    #[cfg(feature = "sim")]
2972    use crate::live_collections::stream::NoOrder;
2973    #[cfg(any(feature = "deploy", feature = "sim"))]
2974    use crate::live_collections::stream::TotalOrder;
2975    #[cfg(any(feature = "deploy", feature = "sim"))]
2976    use crate::location::Location;
2977    #[cfg(any(feature = "deploy", feature = "sim"))]
2978    use crate::nondet::nondet;
2979
2980    mod backtrace_chained_ops;
2981
2982    #[cfg(feature = "deploy")]
2983    struct P1 {}
2984    #[cfg(feature = "deploy")]
2985    struct P2 {}
2986
2987    #[cfg(feature = "deploy")]
2988    #[derive(Serialize, Deserialize, Debug)]
2989    struct SendOverNetwork {
2990        n: u32,
2991    }
2992
2993    #[cfg(feature = "deploy")]
2994    #[tokio::test]
2995    async fn first_ten_distributed() {
2996        use crate::networking::TCP;
2997
2998        let mut deployment = Deployment::new();
2999
3000        let mut flow = FlowBuilder::new();
3001        let first_node = flow.process::<P1>();
3002        let second_node = flow.process::<P2>();
3003        let external = flow.external::<P2>();
3004
3005        let numbers = first_node.source_iter(q!(0..10));
3006        let out_port = numbers
3007            .map(q!(|n| SendOverNetwork { n }))
3008            .send(&second_node, TCP.fail_stop().bincode())
3009            .send_bincode_external(&external);
3010
3011        let nodes = flow
3012            .with_process(&first_node, deployment.Localhost())
3013            .with_process(&second_node, deployment.Localhost())
3014            .with_external(&external, deployment.Localhost())
3015            .deploy(&mut deployment);
3016
3017        deployment.deploy().await.unwrap();
3018
3019        let mut external_out = nodes.connect(out_port).await;
3020
3021        deployment.start().await.unwrap();
3022
3023        for i in 0..10 {
3024            assert_eq!(external_out.next().await.unwrap().n, i);
3025        }
3026    }
3027
3028    #[cfg(feature = "deploy")]
3029    #[tokio::test]
3030    async fn first_cardinality() {
3031        let mut deployment = Deployment::new();
3032
3033        let mut flow = FlowBuilder::new();
3034        let node = flow.process::<()>();
3035        let external = flow.external::<()>();
3036
3037        let node_tick = node.tick();
3038        let count = node_tick
3039            .singleton(q!([1, 2, 3]))
3040            .into_stream()
3041            .flatten_ordered()
3042            .first()
3043            .into_stream()
3044            .count()
3045            .all_ticks()
3046            .send_bincode_external(&external);
3047
3048        let nodes = flow
3049            .with_process(&node, deployment.Localhost())
3050            .with_external(&external, deployment.Localhost())
3051            .deploy(&mut deployment);
3052
3053        deployment.deploy().await.unwrap();
3054
3055        let mut external_out = nodes.connect(count).await;
3056
3057        deployment.start().await.unwrap();
3058
3059        assert_eq!(external_out.next().await.unwrap(), 1);
3060    }
3061
3062    #[cfg(feature = "deploy")]
3063    #[tokio::test]
3064    async fn unbounded_reduce_remembers_state() {
3065        let mut deployment = Deployment::new();
3066
3067        let mut flow = FlowBuilder::new();
3068        let node = flow.process::<()>();
3069        let external = flow.external::<()>();
3070
3071        let (input_port, input) = node.source_external_bincode(&external);
3072        let out = input
3073            .reduce(q!(|acc, v| *acc += v))
3074            .sample_eager(nondet!(/** test */))
3075            .send_bincode_external(&external);
3076
3077        let nodes = flow
3078            .with_process(&node, deployment.Localhost())
3079            .with_external(&external, deployment.Localhost())
3080            .deploy(&mut deployment);
3081
3082        deployment.deploy().await.unwrap();
3083
3084        let mut external_in = nodes.connect(input_port).await;
3085        let mut external_out = nodes.connect(out).await;
3086
3087        deployment.start().await.unwrap();
3088
3089        external_in.send(1).await.unwrap();
3090        assert_eq!(external_out.next().await.unwrap(), 1);
3091
3092        external_in.send(2).await.unwrap();
3093        assert_eq!(external_out.next().await.unwrap(), 3);
3094    }
3095
3096    #[cfg(feature = "deploy")]
3097    #[tokio::test]
3098    async fn top_level_bounded_cross_singleton() {
3099        let mut deployment = Deployment::new();
3100
3101        let mut flow = FlowBuilder::new();
3102        let node = flow.process::<()>();
3103        let external = flow.external::<()>();
3104
3105        let (input_port, input) =
3106            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3107
3108        let out = input
3109            .cross_singleton(
3110                node.source_iter(q!(vec![1, 2, 3]))
3111                    .fold(q!(|| 0), q!(|acc, v| *acc += v)),
3112            )
3113            .send_bincode_external(&external);
3114
3115        let nodes = flow
3116            .with_process(&node, deployment.Localhost())
3117            .with_external(&external, deployment.Localhost())
3118            .deploy(&mut deployment);
3119
3120        deployment.deploy().await.unwrap();
3121
3122        let mut external_in = nodes.connect(input_port).await;
3123        let mut external_out = nodes.connect(out).await;
3124
3125        deployment.start().await.unwrap();
3126
3127        external_in.send(1).await.unwrap();
3128        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3129
3130        external_in.send(2).await.unwrap();
3131        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3132    }
3133
3134    #[cfg(feature = "deploy")]
3135    #[tokio::test]
3136    async fn top_level_bounded_reduce_cardinality() {
3137        let mut deployment = Deployment::new();
3138
3139        let mut flow = FlowBuilder::new();
3140        let node = flow.process::<()>();
3141        let external = flow.external::<()>();
3142
3143        let (input_port, input) =
3144            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3145
3146        let out = sliced! {
3147            let input = use(input, nondet!(/** test */));
3148            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
3149            input.cross_singleton(v.into_stream().count())
3150        }
3151        .send_bincode_external(&external);
3152
3153        let nodes = flow
3154            .with_process(&node, deployment.Localhost())
3155            .with_external(&external, deployment.Localhost())
3156            .deploy(&mut deployment);
3157
3158        deployment.deploy().await.unwrap();
3159
3160        let mut external_in = nodes.connect(input_port).await;
3161        let mut external_out = nodes.connect(out).await;
3162
3163        deployment.start().await.unwrap();
3164
3165        external_in.send(1).await.unwrap();
3166        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3167
3168        external_in.send(2).await.unwrap();
3169        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3170    }
3171
3172    #[cfg(feature = "deploy")]
3173    #[tokio::test]
3174    async fn top_level_bounded_into_singleton_cardinality() {
3175        let mut deployment = Deployment::new();
3176
3177        let mut flow = FlowBuilder::new();
3178        let node = flow.process::<()>();
3179        let external = flow.external::<()>();
3180
3181        let (input_port, input) =
3182            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3183
3184        let out = sliced! {
3185            let input = use(input, nondet!(/** test */));
3186            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
3187            input.cross_singleton(v.into_stream().count())
3188        }
3189        .send_bincode_external(&external);
3190
3191        let nodes = flow
3192            .with_process(&node, deployment.Localhost())
3193            .with_external(&external, deployment.Localhost())
3194            .deploy(&mut deployment);
3195
3196        deployment.deploy().await.unwrap();
3197
3198        let mut external_in = nodes.connect(input_port).await;
3199        let mut external_out = nodes.connect(out).await;
3200
3201        deployment.start().await.unwrap();
3202
3203        external_in.send(1).await.unwrap();
3204        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3205
3206        external_in.send(2).await.unwrap();
3207        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3208    }
3209
3210    #[cfg(feature = "deploy")]
3211    #[tokio::test]
3212    async fn atomic_fold_replays_each_tick() {
3213        let mut deployment = Deployment::new();
3214
3215        let mut flow = FlowBuilder::new();
3216        let node = flow.process::<()>();
3217        let external = flow.external::<()>();
3218
3219        let (input_port, input) =
3220            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3221        let tick = node.tick();
3222
3223        let out = input
3224            .batch(&tick, nondet!(/** test */))
3225            .cross_singleton(
3226                node.source_iter(q!(vec![1, 2, 3]))
3227                    .atomic()
3228                    .fold(q!(|| 0), q!(|acc, v| *acc += v))
3229                    .snapshot_atomic(&tick, nondet!(/** test */)),
3230            )
3231            .all_ticks()
3232            .send_bincode_external(&external);
3233
3234        let nodes = flow
3235            .with_process(&node, deployment.Localhost())
3236            .with_external(&external, deployment.Localhost())
3237            .deploy(&mut deployment);
3238
3239        deployment.deploy().await.unwrap();
3240
3241        let mut external_in = nodes.connect(input_port).await;
3242        let mut external_out = nodes.connect(out).await;
3243
3244        deployment.start().await.unwrap();
3245
3246        external_in.send(1).await.unwrap();
3247        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3248
3249        external_in.send(2).await.unwrap();
3250        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3251    }
3252
3253    #[cfg(feature = "deploy")]
3254    #[tokio::test]
3255    async fn unbounded_scan_remembers_state() {
3256        let mut deployment = Deployment::new();
3257
3258        let mut flow = FlowBuilder::new();
3259        let node = flow.process::<()>();
3260        let external = flow.external::<()>();
3261
3262        let (input_port, input) = node.source_external_bincode(&external);
3263        let out = input
3264            .scan(
3265                q!(|| 0),
3266                q!(|acc, v| {
3267                    *acc += v;
3268                    Some(*acc)
3269                }),
3270            )
3271            .send_bincode_external(&external);
3272
3273        let nodes = flow
3274            .with_process(&node, deployment.Localhost())
3275            .with_external(&external, deployment.Localhost())
3276            .deploy(&mut deployment);
3277
3278        deployment.deploy().await.unwrap();
3279
3280        let mut external_in = nodes.connect(input_port).await;
3281        let mut external_out = nodes.connect(out).await;
3282
3283        deployment.start().await.unwrap();
3284
3285        external_in.send(1).await.unwrap();
3286        assert_eq!(external_out.next().await.unwrap(), 1);
3287
3288        external_in.send(2).await.unwrap();
3289        assert_eq!(external_out.next().await.unwrap(), 3);
3290    }
3291
3292    #[cfg(feature = "deploy")]
3293    #[tokio::test]
3294    async fn unbounded_enumerate_remembers_state() {
3295        let mut deployment = Deployment::new();
3296
3297        let mut flow = FlowBuilder::new();
3298        let node = flow.process::<()>();
3299        let external = flow.external::<()>();
3300
3301        let (input_port, input) = node.source_external_bincode(&external);
3302        let out = input.enumerate().send_bincode_external(&external);
3303
3304        let nodes = flow
3305            .with_process(&node, deployment.Localhost())
3306            .with_external(&external, deployment.Localhost())
3307            .deploy(&mut deployment);
3308
3309        deployment.deploy().await.unwrap();
3310
3311        let mut external_in = nodes.connect(input_port).await;
3312        let mut external_out = nodes.connect(out).await;
3313
3314        deployment.start().await.unwrap();
3315
3316        external_in.send(1).await.unwrap();
3317        assert_eq!(external_out.next().await.unwrap(), (0, 1));
3318
3319        external_in.send(2).await.unwrap();
3320        assert_eq!(external_out.next().await.unwrap(), (1, 2));
3321    }
3322
3323    #[cfg(feature = "deploy")]
3324    #[tokio::test]
3325    async fn unbounded_unique_remembers_state() {
3326        let mut deployment = Deployment::new();
3327
3328        let mut flow = FlowBuilder::new();
3329        let node = flow.process::<()>();
3330        let external = flow.external::<()>();
3331
3332        let (input_port, input) =
3333            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3334        let out = input.unique().send_bincode_external(&external);
3335
3336        let nodes = flow
3337            .with_process(&node, deployment.Localhost())
3338            .with_external(&external, deployment.Localhost())
3339            .deploy(&mut deployment);
3340
3341        deployment.deploy().await.unwrap();
3342
3343        let mut external_in = nodes.connect(input_port).await;
3344        let mut external_out = nodes.connect(out).await;
3345
3346        deployment.start().await.unwrap();
3347
3348        external_in.send(1).await.unwrap();
3349        assert_eq!(external_out.next().await.unwrap(), 1);
3350
3351        external_in.send(2).await.unwrap();
3352        assert_eq!(external_out.next().await.unwrap(), 2);
3353
3354        external_in.send(1).await.unwrap();
3355        external_in.send(3).await.unwrap();
3356        assert_eq!(external_out.next().await.unwrap(), 3);
3357    }
3358
3359    #[cfg(feature = "sim")]
3360    #[test]
3361    #[should_panic]
3362    fn sim_batch_nondet_size() {
3363        let mut flow = FlowBuilder::new();
3364        let node = flow.process::<()>();
3365
3366        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3367
3368        let tick = node.tick();
3369        let out_recv = input
3370            .batch(&tick, nondet!(/** test */))
3371            .count()
3372            .all_ticks()
3373            .sim_output();
3374
3375        flow.sim().exhaustive(async || {
3376            in_send.send(());
3377            in_send.send(());
3378            in_send.send(());
3379
3380            assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3381        });
3382    }
3383
3384    #[cfg(feature = "sim")]
3385    #[test]
3386    fn sim_batch_preserves_order() {
3387        let mut flow = FlowBuilder::new();
3388        let node = flow.process::<()>();
3389
3390        let (in_send, input) = node.sim_input();
3391
3392        let tick = node.tick();
3393        let out_recv = input
3394            .batch(&tick, nondet!(/** test */))
3395            .all_ticks()
3396            .sim_output();
3397
3398        flow.sim().exhaustive(async || {
3399            in_send.send(1);
3400            in_send.send(2);
3401            in_send.send(3);
3402
3403            out_recv.assert_yields_only([1, 2, 3]).await;
3404        });
3405    }
3406
3407    #[cfg(feature = "sim")]
3408    #[test]
3409    #[should_panic]
3410    fn sim_batch_unordered_shuffles() {
3411        let mut flow = FlowBuilder::new();
3412        let node = flow.process::<()>();
3413
3414        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3415
3416        let tick = node.tick();
3417        let batch = input.batch(&tick, nondet!(/** test */));
3418        let out_recv = batch
3419            .clone()
3420            .min()
3421            .zip(batch.max())
3422            .all_ticks()
3423            .sim_output();
3424
3425        flow.sim().exhaustive(async || {
3426            in_send.send_many_unordered([1, 2, 3]);
3427
3428            if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3429                panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3430            }
3431        });
3432    }
3433
3434    #[cfg(feature = "sim")]
3435    #[test]
3436    fn sim_batch_unordered_shuffles_count() {
3437        let mut flow = FlowBuilder::new();
3438        let node = flow.process::<()>();
3439
3440        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3441
3442        let tick = node.tick();
3443        let batch = input.batch(&tick, nondet!(/** test */));
3444        let out_recv = batch.all_ticks().sim_output();
3445
3446        let instance_count = flow.sim().exhaustive(async || {
3447            in_send.send_many_unordered([1, 2, 3, 4]);
3448            out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3449        });
3450
3451        assert_eq!(
3452            instance_count,
3453            75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3454        )
3455    }
3456
3457    #[cfg(feature = "sim")]
3458    #[test]
3459    #[should_panic]
3460    fn sim_observe_order_batched() {
3461        let mut flow = FlowBuilder::new();
3462        let node = flow.process::<()>();
3463
3464        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3465
3466        let tick = node.tick();
3467        let batch = input.batch(&tick, nondet!(/** test */));
3468        let out_recv = batch
3469            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3470            .all_ticks()
3471            .sim_output();
3472
3473        flow.sim().exhaustive(async || {
3474            in_send.send_many_unordered([1, 2, 3, 4]);
3475            out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3476        });
3477    }
3478
3479    #[cfg(feature = "sim")]
3480    #[test]
3481    fn sim_observe_order_batched_count() {
3482        let mut flow = FlowBuilder::new();
3483        let node = flow.process::<()>();
3484
3485        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3486
3487        let tick = node.tick();
3488        let batch = input.batch(&tick, nondet!(/** test */));
3489        let out_recv = batch
3490            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3491            .all_ticks()
3492            .sim_output();
3493
3494        let instance_count = flow.sim().exhaustive(async || {
3495            in_send.send_many_unordered([1, 2, 3, 4]);
3496            let _ = out_recv.collect::<Vec<_>>().await;
3497        });
3498
3499        assert_eq!(
3500            instance_count,
3501            192 // 4! * 2^{4 - 1}
3502        )
3503    }
3504
3505    #[cfg(feature = "sim")]
3506    #[test]
3507    fn sim_unordered_count_instance_count() {
3508        let mut flow = FlowBuilder::new();
3509        let node = flow.process::<()>();
3510
3511        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3512
3513        let tick = node.tick();
3514        let out_recv = input
3515            .count()
3516            .snapshot(&tick, nondet!(/** test */))
3517            .all_ticks()
3518            .sim_output();
3519
3520        let instance_count = flow.sim().exhaustive(async || {
3521            in_send.send_many_unordered([1, 2, 3, 4]);
3522            assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3523        });
3524
3525        assert_eq!(
3526            instance_count,
3527            16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3528        )
3529    }
3530
3531    #[cfg(feature = "sim")]
3532    #[test]
3533    fn sim_top_level_assume_ordering() {
3534        let mut flow = FlowBuilder::new();
3535        let node = flow.process::<()>();
3536
3537        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3538
3539        let out_recv = input
3540            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3541            .sim_output();
3542
3543        let instance_count = flow.sim().exhaustive(async || {
3544            in_send.send_many_unordered([1, 2, 3]);
3545            let mut out = out_recv.collect::<Vec<_>>().await;
3546            out.sort();
3547            assert_eq!(out, vec![1, 2, 3]);
3548        });
3549
3550        assert_eq!(instance_count, 6)
3551    }
3552
3553    #[cfg(feature = "sim")]
3554    #[test]
3555    fn sim_top_level_assume_ordering_cycle_back() {
3556        let mut flow = FlowBuilder::new();
3557        let node = flow.process::<()>();
3558
3559        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3560
3561        let (complete_cycle_back, cycle_back) =
3562            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3563        let ordered = input
3564            .merge_unordered(cycle_back)
3565            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3566        complete_cycle_back.complete(
3567            ordered
3568                .clone()
3569                .map(q!(|v| v + 1))
3570                .filter(q!(|v| v % 2 == 1)),
3571        );
3572
3573        let out_recv = ordered.sim_output();
3574
3575        let mut saw = false;
3576        let instance_count = flow.sim().exhaustive(async || {
3577            in_send.send_many_unordered([0, 2]);
3578            let out = out_recv.collect::<Vec<_>>().await;
3579
3580            if out.starts_with(&[0, 1, 2]) {
3581                saw = true;
3582            }
3583        });
3584
3585        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3586        assert_eq!(instance_count, 6)
3587    }
3588
3589    #[cfg(feature = "sim")]
3590    #[test]
3591    fn sim_top_level_assume_ordering_cycle_back_tick() {
3592        let mut flow = FlowBuilder::new();
3593        let node = flow.process::<()>();
3594
3595        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3596
3597        let (complete_cycle_back, cycle_back) =
3598            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3599        let ordered = input
3600            .merge_unordered(cycle_back)
3601            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3602        complete_cycle_back.complete(
3603            ordered
3604                .clone()
3605                .batch(&node.tick(), nondet!(/** test */))
3606                .all_ticks()
3607                .map(q!(|v| v + 1))
3608                .filter(q!(|v| v % 2 == 1)),
3609        );
3610
3611        let out_recv = ordered.sim_output();
3612
3613        let mut saw = false;
3614        let instance_count = flow.sim().exhaustive(async || {
3615            in_send.send_many_unordered([0, 2]);
3616            let out = out_recv.collect::<Vec<_>>().await;
3617
3618            if out.starts_with(&[0, 1, 2]) {
3619                saw = true;
3620            }
3621        });
3622
3623        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3624        assert_eq!(instance_count, 58)
3625    }
3626
3627    #[cfg(feature = "sim")]
3628    #[test]
3629    fn sim_top_level_assume_ordering_multiple() {
3630        let mut flow = FlowBuilder::new();
3631        let node = flow.process::<()>();
3632
3633        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3634        let (_, input2) = node.sim_input::<_, NoOrder, _>();
3635
3636        let (complete_cycle_back, cycle_back) =
3637            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3638        let input1_ordered = input
3639            .clone()
3640            .merge_unordered(cycle_back)
3641            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3642        let foo = input1_ordered
3643            .clone()
3644            .map(q!(|v| v + 3))
3645            .weaken_ordering::<NoOrder>()
3646            .merge_unordered(input2)
3647            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3648
3649        complete_cycle_back.complete(foo.filter(q!(|v| *v == 3)));
3650
3651        let out_recv = input1_ordered.sim_output();
3652
3653        let mut saw = false;
3654        let instance_count = flow.sim().exhaustive(async || {
3655            in_send.send_many_unordered([0, 1]);
3656            let out = out_recv.collect::<Vec<_>>().await;
3657
3658            if out.starts_with(&[0, 3, 1]) {
3659                saw = true;
3660            }
3661        });
3662
3663        assert!(saw, "did not see an instance with 0, 3, 1 in order");
3664        assert_eq!(instance_count, 24)
3665    }
3666
3667    #[cfg(feature = "sim")]
3668    #[test]
3669    fn sim_atomic_assume_ordering_cycle_back() {
3670        let mut flow = FlowBuilder::new();
3671        let node = flow.process::<()>();
3672
3673        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3674
3675        let (complete_cycle_back, cycle_back) =
3676            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3677        let ordered = input
3678            .merge_unordered(cycle_back)
3679            .atomic()
3680            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3681            .end_atomic();
3682        complete_cycle_back.complete(
3683            ordered
3684                .clone()
3685                .map(q!(|v| v + 1))
3686                .filter(q!(|v| v % 2 == 1)),
3687        );
3688
3689        let out_recv = ordered.sim_output();
3690
3691        let instance_count = flow.sim().exhaustive(async || {
3692            in_send.send_many_unordered([0, 2]);
3693            let out = out_recv.collect::<Vec<_>>().await;
3694            assert_eq!(out.len(), 4);
3695        });
3696
3697        assert_eq!(instance_count, 22)
3698    }
3699
3700    #[cfg(feature = "deploy")]
3701    #[tokio::test]
3702    async fn partition_evens_odds() {
3703        let mut deployment = Deployment::new();
3704
3705        let mut flow = FlowBuilder::new();
3706        let node = flow.process::<()>();
3707        let external = flow.external::<()>();
3708
3709        let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
3710        let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
3711        let evens_port = evens.send_bincode_external(&external);
3712        let odds_port = odds.send_bincode_external(&external);
3713
3714        let nodes = flow
3715            .with_process(&node, deployment.Localhost())
3716            .with_external(&external, deployment.Localhost())
3717            .deploy(&mut deployment);
3718
3719        deployment.deploy().await.unwrap();
3720
3721        let mut evens_out = nodes.connect(evens_port).await;
3722        let mut odds_out = nodes.connect(odds_port).await;
3723
3724        deployment.start().await.unwrap();
3725
3726        let mut even_results = Vec::new();
3727        for _ in 0..3 {
3728            even_results.push(evens_out.next().await.unwrap());
3729        }
3730        even_results.sort();
3731        assert_eq!(even_results, vec![2, 4, 6]);
3732
3733        let mut odd_results = Vec::new();
3734        for _ in 0..3 {
3735            odd_results.push(odds_out.next().await.unwrap());
3736        }
3737        odd_results.sort();
3738        assert_eq!(odd_results, vec![1, 3, 5]);
3739    }
3740
3741    #[cfg(feature = "deploy")]
3742    #[tokio::test]
3743    async fn unconsumed_inspect_still_runs() {
3744        use crate::deploy::DeployCrateWrapper;
3745
3746        let mut deployment = Deployment::new();
3747
3748        let mut flow = FlowBuilder::new();
3749        let node = flow.process::<()>();
3750
3751        // The return value of .inspect() is intentionally dropped.
3752        // Before the Null-root fix, this would silently do nothing.
3753        node.source_iter(q!(0..5))
3754            .inspect(q!(|x| println!("inspect: {}", x)));
3755
3756        let nodes = flow
3757            .with_process(&node, deployment.Localhost())
3758            .deploy(&mut deployment);
3759
3760        deployment.deploy().await.unwrap();
3761
3762        let mut stdout = nodes.get_process(&node).stdout();
3763
3764        deployment.start().await.unwrap();
3765
3766        let mut lines = Vec::new();
3767        for _ in 0..5 {
3768            lines.push(stdout.recv().await.unwrap());
3769        }
3770        lines.sort();
3771        assert_eq!(
3772            lines,
3773            vec![
3774                "inspect: 0",
3775                "inspect: 1",
3776                "inspect: 2",
3777                "inspect: 3",
3778                "inspect: 4",
3779            ]
3780        );
3781    }
3782
3783    #[cfg(feature = "sim")]
3784    #[test]
3785    fn sim_limit() {
3786        let mut flow = FlowBuilder::new();
3787        let node = flow.process::<()>();
3788
3789        let (in_send, input) = node.sim_input();
3790
3791        let out_recv = input.limit(q!(3)).sim_output();
3792
3793        flow.sim().exhaustive(async || {
3794            in_send.send(1);
3795            in_send.send(2);
3796            in_send.send(3);
3797            in_send.send(4);
3798            in_send.send(5);
3799
3800            out_recv.assert_yields_only([1, 2, 3]).await;
3801        });
3802    }
3803
3804    #[cfg(feature = "sim")]
3805    #[test]
3806    fn sim_limit_zero() {
3807        let mut flow = FlowBuilder::new();
3808        let node = flow.process::<()>();
3809
3810        let (in_send, input) = node.sim_input();
3811
3812        let out_recv = input.limit(q!(0)).sim_output();
3813
3814        flow.sim().exhaustive(async || {
3815            in_send.send(1);
3816            in_send.send(2);
3817
3818            out_recv.assert_yields_only::<i32, _>([]).await;
3819        });
3820    }
3821
3822    #[cfg(feature = "deploy")]
3823    #[tokio::test]
3824    async fn monotone_fold_threshold() {
3825        use crate::properties::manual_proof;
3826
3827        let mut deployment = Deployment::new();
3828
3829        let mut flow = FlowBuilder::new();
3830        let node = flow.process::<()>();
3831        let external = flow.external::<()>();
3832
3833        let in_unbounded: super::Stream<_, _> =
3834            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
3835        let sum = in_unbounded.fold(
3836            q!(|| 0),
3837            q!(
3838                |sum, v| {
3839                    *sum += v;
3840                },
3841                monotone = manual_proof!(/** test */)
3842            ),
3843        );
3844
3845        let threshold_out = sum
3846            .threshold_greater_or_equal(node.singleton(q!(7)))
3847            .send_bincode_external(&external);
3848
3849        let nodes = flow
3850            .with_process(&node, deployment.Localhost())
3851            .with_external(&external, deployment.Localhost())
3852            .deploy(&mut deployment);
3853
3854        deployment.deploy().await.unwrap();
3855
3856        let mut threshold_out = nodes.connect(threshold_out).await;
3857
3858        deployment.start().await.unwrap();
3859
3860        assert_eq!(threshold_out.next().await.unwrap(), 7);
3861    }
3862
3863    #[cfg(feature = "deploy")]
3864    #[tokio::test]
3865    async fn monotone_count_threshold() {
3866        let mut deployment = Deployment::new();
3867
3868        let mut flow = FlowBuilder::new();
3869        let node = flow.process::<()>();
3870        let external = flow.external::<()>();
3871
3872        let in_unbounded: super::Stream<_, _> =
3873            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
3874        let sum = in_unbounded.count();
3875
3876        let threshold_out = sum
3877            .threshold_greater_or_equal(node.singleton(q!(3)))
3878            .send_bincode_external(&external);
3879
3880        let nodes = flow
3881            .with_process(&node, deployment.Localhost())
3882            .with_external(&external, deployment.Localhost())
3883            .deploy(&mut deployment);
3884
3885        deployment.deploy().await.unwrap();
3886
3887        let mut threshold_out = nodes.connect(threshold_out).await;
3888
3889        deployment.start().await.unwrap();
3890
3891        assert_eq!(threshold_out.next().await.unwrap(), 3);
3892    }
3893}