hydro_lang/
stream.rs

1use std::cell::RefCell;
2use std::future::Future;
3use std::hash::Hash;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use bytes::Bytes;
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use stageleft::{IntoQuotedMut, QuotedWithContext, q};
12use syn::parse_quote;
13use tokio::time::Instant;
14
15use crate::builder::FLOW_USED_MESSAGE;
16use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker};
17use crate::ir::{DebugInstantiate, HydroLeaf, HydroNode, TeeNode};
18use crate::location::external_process::{ExternalBincodeStream, ExternalBytesPort};
19use crate::location::tick::{Atomic, NoAtomic};
20use crate::location::{
21    CanSend, ExternalProcess, Location, LocationId, NoTick, Tick, check_matching_location,
22};
23use crate::staging_util::get_this_crate;
24use crate::{Bounded, Cluster, ClusterId, Optional, Singleton, Unbounded};
25
26/// Marks the stream as being totally ordered, which means that there are
27/// no sources of non-determinism (other than intentional ones) that will
28/// affect the order of elements.
29pub enum TotalOrder {}
30
31/// Marks the stream as having no order, which means that the order of
32/// elements may be affected by non-determinism.
33///
34/// This restricts certain operators, such as `fold` and `reduce`, to only
35/// be used with commutative aggregation functions.
36pub enum NoOrder {}
37
38/// Helper trait for determining the weakest of two orderings.
39#[sealed::sealed]
40pub trait MinOrder<Other> {
41    /// The weaker of the two orderings.
42    type Min;
43}
44
45#[sealed::sealed]
46impl<T> MinOrder<T> for T {
47    type Min = T;
48}
49
50#[sealed::sealed]
51impl MinOrder<NoOrder> for TotalOrder {
52    type Min = NoOrder;
53}
54
55#[sealed::sealed]
56impl MinOrder<TotalOrder> for NoOrder {
57    type Min = NoOrder;
58}
59
60/// Marks the stream as having deterministic message cardinality, with no
61/// possibility of duplicates.
62pub enum ExactlyOnce {}
63
64/// Marks the stream as having non-deterministic message cardinality, which
65/// means that duplicates may occur, but messages will not be dropped.
66pub enum AtLeastOnce {}
67
68/// Helper trait for determining the weakest of two retry guarantees.
69#[sealed::sealed]
70pub trait MinRetries<Other> {
71    /// The weaker of the two retry guarantees.
72    type Min;
73}
74
75#[sealed::sealed]
76impl<T> MinRetries<T> for T {
77    type Min = T;
78}
79
80#[sealed::sealed]
81impl MinRetries<ExactlyOnce> for AtLeastOnce {
82    type Min = AtLeastOnce;
83}
84
85#[sealed::sealed]
86impl MinRetries<AtLeastOnce> for ExactlyOnce {
87    type Min = ExactlyOnce;
88}
89
90/// An ordered sequence stream of elements of type `T`.
91///
92/// Type Parameters:
93/// - `Type`: the type of elements in the stream
94/// - `Loc`: the location where the stream is being materialized
95/// - `Bound`: the boundedness of the stream, which is either [`Bounded`]
96///   or [`Unbounded`]
97/// - `Order`: the ordering of the stream, which is either [`TotalOrder`]
98///   or [`NoOrder`] (default is [`TotalOrder`])
99pub struct Stream<Type, Loc, Bound, Order = TotalOrder, Retries = ExactlyOnce> {
100    location: Loc,
101    pub(crate) ir_node: RefCell<HydroNode>,
102
103    _phantom: PhantomData<(Type, Loc, Bound, Order, Retries)>,
104}
105
106impl<'a, T, L, O, R> From<Stream<T, L, Bounded, O, R>> for Stream<T, L, Unbounded, O, R>
107where
108    L: Location<'a>,
109{
110    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
111        Stream {
112            location: stream.location,
113            ir_node: stream.ir_node,
114            _phantom: PhantomData,
115        }
116    }
117}
118
119impl<'a, T, L, B, R> From<Stream<T, L, B, TotalOrder, R>> for Stream<T, L, B, NoOrder, R>
120where
121    L: Location<'a>,
122{
123    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
124        Stream {
125            location: stream.location,
126            ir_node: stream.ir_node,
127            _phantom: PhantomData,
128        }
129    }
130}
131
132impl<'a, T, L, B, O> From<Stream<T, L, B, O, ExactlyOnce>> for Stream<T, L, B, O, AtLeastOnce>
133where
134    L: Location<'a>,
135{
136    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
137        Stream {
138            location: stream.location,
139            ir_node: stream.ir_node,
140            _phantom: PhantomData,
141        }
142    }
143}
144
145impl<'a, T, L, B, O, R> Stream<T, L, B, O, R>
146where
147    L: Location<'a>,
148{
149    fn location_kind(&self) -> LocationId {
150        self.location.id()
151    }
152}
153
154impl<'a, T, L, O, R> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
155where
156    L: Location<'a>,
157{
158    fn defer_tick(self) -> Self {
159        Stream::defer_tick(self)
160    }
161}
162
163impl<'a, T, L, O, R> CycleCollection<'a, TickCycleMarker> for Stream<T, Tick<L>, Bounded, O, R>
164where
165    L: Location<'a>,
166{
167    type Location = Tick<L>;
168
169    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
170        let location_id = location.id();
171        Stream::new(
172            location.clone(),
173            HydroNode::CycleSource {
174                ident,
175                location_kind: location_id,
176                metadata: location.new_node_metadata::<T>(),
177            },
178        )
179    }
180}
181
182impl<'a, T, L, O, R> CycleComplete<'a, TickCycleMarker> for Stream<T, Tick<L>, Bounded, O, R>
183where
184    L: Location<'a>,
185{
186    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
187        assert_eq!(
188            self.location.id(),
189            expected_location,
190            "locations do not match"
191        );
192        self.location
193            .flow_state()
194            .borrow_mut()
195            .leaves
196            .as_mut()
197            .expect(FLOW_USED_MESSAGE)
198            .push(HydroLeaf::CycleSink {
199                ident,
200                location_kind: self.location_kind(),
201                input: Box::new(self.ir_node.into_inner()),
202                metadata: self.location.new_node_metadata::<T>(),
203            });
204    }
205}
206
207impl<'a, T, L, B, O, R> CycleCollection<'a, ForwardRefMarker> for Stream<T, L, B, O, R>
208where
209    L: Location<'a> + NoTick,
210{
211    type Location = L;
212
213    fn create_source(ident: syn::Ident, location: L) -> Self {
214        let location_id = location.id();
215
216        Stream::new(
217            location.clone(),
218            HydroNode::Persist {
219                inner: Box::new(HydroNode::CycleSource {
220                    ident,
221                    location_kind: location_id,
222                    metadata: location.new_node_metadata::<T>(),
223                }),
224                metadata: location.new_node_metadata::<T>(),
225            },
226        )
227    }
228}
229
230impl<'a, T, L, B, O, R> CycleComplete<'a, ForwardRefMarker> for Stream<T, L, B, O, R>
231where
232    L: Location<'a> + NoTick,
233{
234    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
235        assert_eq!(
236            self.location.id(),
237            expected_location,
238            "locations do not match"
239        );
240        let metadata = self.location.new_node_metadata::<T>();
241        self.location
242            .flow_state()
243            .borrow_mut()
244            .leaves
245            .as_mut()
246            .expect(FLOW_USED_MESSAGE)
247            .push(HydroLeaf::CycleSink {
248                ident,
249                location_kind: self.location_kind(),
250                input: Box::new(HydroNode::Unpersist {
251                    inner: Box::new(self.ir_node.into_inner()),
252                    metadata: metadata.clone(),
253                }),
254                metadata,
255            });
256    }
257}
258
259impl<'a, T, L, B, O, R> Stream<T, L, B, O, R>
260where
261    L: Location<'a>,
262{
263    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
264        Stream {
265            location,
266            ir_node: RefCell::new(ir_node),
267            _phantom: PhantomData,
268        }
269    }
270}
271
272impl<'a, T, L, B, O, R> Clone for Stream<T, L, B, O, R>
273where
274    T: Clone,
275    L: Location<'a>,
276{
277    fn clone(&self) -> Self {
278        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
279            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
280            *self.ir_node.borrow_mut() = HydroNode::Tee {
281                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
282                metadata: self.location.new_node_metadata::<T>(),
283            };
284        }
285
286        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
287            Stream {
288                location: self.location.clone(),
289                ir_node: HydroNode::Tee {
290                    inner: TeeNode(inner.0.clone()),
291                    metadata: metadata.clone(),
292                }
293                .into(),
294                _phantom: PhantomData,
295            }
296        } else {
297            unreachable!()
298        }
299    }
300}
301
302impl<'a, T, L, B, O, R> Stream<T, L, B, O, R>
303where
304    L: Location<'a>,
305{
306    /// Produces a stream based on invoking `f` on each element in order.
307    /// If you do not want to modify the stream and instead only want to view
308    /// each item use [`Stream::inspect`] instead.
309    ///
310    /// # Example
311    /// ```rust
312    /// # use hydro_lang::*;
313    /// # use futures::StreamExt;
314    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
315    /// let words = process.source_iter(q!(vec!["hello", "world"]));
316    /// words.map(q!(|x| x.to_uppercase()))
317    /// # }, |mut stream| async move {
318    /// # for w in vec!["HELLO", "WORLD"] {
319    /// #     assert_eq!(stream.next().await.unwrap(), w);
320    /// # }
321    /// # }));
322    /// ```
323    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
324    where
325        F: Fn(T) -> U + 'a,
326    {
327        let f = f.splice_fn1_ctx(&self.location).into();
328        Stream::new(
329            self.location.clone(),
330            HydroNode::Map {
331                f,
332                input: Box::new(self.ir_node.into_inner()),
333                metadata: self.location.new_node_metadata::<U>(),
334            },
335        )
336    }
337
338    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
339    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
340    /// for the output type `U` must produce items in a **deterministic** order.
341    ///
342    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
343    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
344    ///
345    /// # Example
346    /// ```rust
347    /// # use hydro_lang::*;
348    /// # use futures::StreamExt;
349    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
350    /// process
351    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
352    ///     .flat_map_ordered(q!(|x| x))
353    /// # }, |mut stream| async move {
354    /// // 1, 2, 3, 4
355    /// # for w in (1..5) {
356    /// #     assert_eq!(stream.next().await.unwrap(), w);
357    /// # }
358    /// # }));
359    /// ```
360    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
361    where
362        I: IntoIterator<Item = U>,
363        F: Fn(T) -> I + 'a,
364    {
365        let f = f.splice_fn1_ctx(&self.location).into();
366        Stream::new(
367            self.location.clone(),
368            HydroNode::FlatMap {
369                f,
370                input: Box::new(self.ir_node.into_inner()),
371                metadata: self.location.new_node_metadata::<U>(),
372            },
373        )
374    }
375
376    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
377    /// for the output type `U` to produce items in any order.
378    ///
379    /// # Example
380    /// ```rust
381    /// # use hydro_lang::{*, stream::ExactlyOnce};
382    /// # use futures::StreamExt;
383    /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
384    /// process
385    ///     .source_iter(q!(vec![
386    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
387    ///         std::collections::HashSet::from_iter(vec![3, 4]),
388    ///     ]))
389    ///     .flat_map_unordered(q!(|x| x))
390    /// # }, |mut stream| async move {
391    /// // 1, 2, 3, 4, but in no particular order
392    /// # let mut results = Vec::new();
393    /// # for w in (1..5) {
394    /// #     results.push(stream.next().await.unwrap());
395    /// # }
396    /// # results.sort();
397    /// # assert_eq!(results, vec![1, 2, 3, 4]);
398    /// # }));
399    /// ```
400    pub fn flat_map_unordered<U, I, F>(
401        self,
402        f: impl IntoQuotedMut<'a, F, L>,
403    ) -> Stream<U, L, B, NoOrder, R>
404    where
405        I: IntoIterator<Item = U>,
406        F: Fn(T) -> I + 'a,
407    {
408        let f = f.splice_fn1_ctx(&self.location).into();
409        Stream::new(
410            self.location.clone(),
411            HydroNode::FlatMap {
412                f,
413                input: Box::new(self.ir_node.into_inner()),
414                metadata: self.location.new_node_metadata::<U>(),
415            },
416        )
417    }
418
419    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
420    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
421    ///
422    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
423    /// not deterministic, use [`Stream::flatten_unordered`] instead.
424    ///
425    /// ```rust
426    /// # use hydro_lang::*;
427    /// # use futures::StreamExt;
428    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
429    /// process
430    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
431    ///     .flatten_ordered()
432    /// # }, |mut stream| async move {
433    /// // 1, 2, 3, 4
434    /// # for w in (1..5) {
435    /// #     assert_eq!(stream.next().await.unwrap(), w);
436    /// # }
437    /// # }));
438    /// ```
439    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
440    where
441        T: IntoIterator<Item = U>,
442    {
443        self.flat_map_ordered(q!(|d| d))
444    }
445
446    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
447    /// for the element type `T` to produce items in any order.
448    ///
449    /// # Example
450    /// ```rust
451    /// # use hydro_lang::{*, stream::ExactlyOnce};
452    /// # use futures::StreamExt;
453    /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
454    /// process
455    ///     .source_iter(q!(vec![
456    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
457    ///         std::collections::HashSet::from_iter(vec![3, 4]),
458    ///     ]))
459    ///     .flatten_unordered()
460    /// # }, |mut stream| async move {
461    /// // 1, 2, 3, 4, but in no particular order
462    /// # let mut results = Vec::new();
463    /// # for w in (1..5) {
464    /// #     results.push(stream.next().await.unwrap());
465    /// # }
466    /// # results.sort();
467    /// # assert_eq!(results, vec![1, 2, 3, 4]);
468    /// # }));
469    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
470    where
471        T: IntoIterator<Item = U>,
472    {
473        self.flat_map_unordered(q!(|d| d))
474    }
475
476    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
477    /// `f`, preserving the order of the elements.
478    ///
479    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
480    /// not modify or take ownership of the values. If you need to modify the values while filtering
481    /// use [`Stream::filter_map`] instead.
482    ///
483    /// # Example
484    /// ```rust
485    /// # use hydro_lang::*;
486    /// # use futures::StreamExt;
487    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
488    /// process
489    ///     .source_iter(q!(vec![1, 2, 3, 4]))
490    ///     .filter(q!(|&x| x > 2))
491    /// # }, |mut stream| async move {
492    /// // 3, 4
493    /// # for w in (3..5) {
494    /// #     assert_eq!(stream.next().await.unwrap(), w);
495    /// # }
496    /// # }));
497    /// ```
498    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<T, L, B, O, R>
499    where
500        F: Fn(&T) -> bool + 'a,
501    {
502        let f = f.splice_fn1_borrow_ctx(&self.location).into();
503        Stream::new(
504            self.location.clone(),
505            HydroNode::Filter {
506                f,
507                input: Box::new(self.ir_node.into_inner()),
508                metadata: self.location.new_node_metadata::<T>(),
509            },
510        )
511    }
512
513    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
514    ///
515    /// # Example
516    /// ```rust
517    /// # use hydro_lang::*;
518    /// # use futures::StreamExt;
519    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
520    /// process
521    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
522    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
523    /// # }, |mut stream| async move {
524    /// // 1, 2
525    /// # for w in (1..3) {
526    /// #     assert_eq!(stream.next().await.unwrap(), w);
527    /// # }
528    /// # }));
529    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
530    where
531        F: Fn(T) -> Option<U> + 'a,
532    {
533        let f = f.splice_fn1_ctx(&self.location).into();
534        Stream::new(
535            self.location.clone(),
536            HydroNode::FilterMap {
537                f,
538                input: Box::new(self.ir_node.into_inner()),
539                metadata: self.location.new_node_metadata::<U>(),
540            },
541        )
542    }
543
544    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
545    /// where `x` is the final value of `other`, a bounded [`Singleton`].
546    ///
547    /// # Example
548    /// ```rust
549    /// # use hydro_lang::*;
550    /// # use futures::StreamExt;
551    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
552    /// let tick = process.tick();
553    /// let batch = unsafe {
554    ///     process
555    ///         .source_iter(q!(vec![1, 2, 3, 4]))
556    ///         .tick_batch(&tick)
557    /// };
558    /// let count = batch.clone().count(); // `count()` returns a singleton
559    /// batch.cross_singleton(count).all_ticks()
560    /// # }, |mut stream| async move {
561    /// // (1, 4), (2, 4), (3, 4), (4, 4)
562    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
563    /// #     assert_eq!(stream.next().await.unwrap(), w);
564    /// # }
565    /// # }));
566    pub fn cross_singleton<O2>(
567        self,
568        other: impl Into<Optional<O2, L, Bounded>>,
569    ) -> Stream<(T, O2), L, B, O, R>
570    where
571        O2: Clone,
572    {
573        let other: Optional<O2, L, Bounded> = other.into();
574        check_matching_location(&self.location, &other.location);
575
576        Stream::new(
577            self.location.clone(),
578            HydroNode::CrossSingleton {
579                left: Box::new(self.ir_node.into_inner()),
580                right: Box::new(other.ir_node.into_inner()),
581                metadata: self.location.new_node_metadata::<(T, O2)>(),
582            },
583        )
584    }
585
586    /// Allow this stream through if the argument (a Bounded Optional) is non-empty, otherwise the output is empty.
587    pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
588        self.cross_singleton(signal.map(q!(|_u| ())))
589            .map(q!(|(d, _signal)| d))
590    }
591
592    /// Allow this stream through if the argument (a Bounded Optional) is empty, otherwise the output is empty.
593    pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
594        self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
595    }
596
597    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
598    /// tupled pairs in a non-deterministic order.
599    ///
600    /// # Example
601    /// ```rust
602    /// # use hydro_lang::*;
603    /// # use std::collections::HashSet;
604    /// # use futures::StreamExt;
605    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
606    /// let tick = process.tick();
607    /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
608    /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
609    /// stream1.cross_product(stream2)
610    /// # }, |mut stream| async move {
611    /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
612    /// # stream.map(|i| assert!(expected.contains(&i)));
613    /// # }));
614    pub fn cross_product<O2>(
615        self,
616        other: Stream<O2, L, B, O, R>,
617    ) -> Stream<(T, O2), L, B, NoOrder, R>
618    where
619        T: Clone,
620        O2: Clone,
621    {
622        check_matching_location(&self.location, &other.location);
623
624        Stream::new(
625            self.location.clone(),
626            HydroNode::CrossProduct {
627                left: Box::new(self.ir_node.into_inner()),
628                right: Box::new(other.ir_node.into_inner()),
629                metadata: self.location.new_node_metadata::<(T, O2)>(),
630            },
631        )
632    }
633
634    /// Takes one stream as input and filters out any duplicate occurrences. The output
635    /// contains all unique values from the input.
636    ///
637    /// # Example
638    /// ```rust
639    /// # use hydro_lang::*;
640    /// # use futures::StreamExt;
641    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
642    /// let tick = process.tick();
643    ///     process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
644    /// # }, |mut stream| async move {
645    /// # for w in vec![1, 2, 3, 4] {
646    /// #     assert_eq!(stream.next().await.unwrap(), w);
647    /// # }
648    /// # }));
649    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
650    where
651        T: Eq + Hash,
652    {
653        Stream::new(
654            self.location.clone(),
655            HydroNode::Unique {
656                input: Box::new(self.ir_node.into_inner()),
657                metadata: self.location.new_node_metadata::<T>(),
658            },
659        )
660    }
661
662    /// Outputs everything in this stream that is *not* contained in the `other` stream.
663    ///
664    /// The `other` stream must be [`Bounded`], since this function will wait until
665    /// all its elements are available before producing any output.
666    /// # Example
667    /// ```rust
668    /// # use hydro_lang::*;
669    /// # use futures::StreamExt;
670    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
671    /// let tick = process.tick();
672    /// let stream = unsafe {
673    ///    process
674    ///    .source_iter(q!(vec![ 1, 2, 3, 4 ]))
675    ///    .tick_batch(&tick)
676    /// };
677    /// let batch = unsafe {
678    ///     process
679    ///         .source_iter(q!(vec![1, 2]))
680    ///         .tick_batch(&tick)
681    /// };
682    /// stream.filter_not_in(batch).all_ticks()
683    /// # }, |mut stream| async move {
684    /// # for w in vec![3, 4] {
685    /// #     assert_eq!(stream.next().await.unwrap(), w);
686    /// # }
687    /// # }));
688    pub fn filter_not_in<O2>(
689        self,
690        other: Stream<T, L, Bounded, O2, R>,
691    ) -> Stream<T, L, Bounded, O, R>
692    where
693        T: Eq + Hash,
694    {
695        check_matching_location(&self.location, &other.location);
696
697        Stream::new(
698            self.location.clone(),
699            HydroNode::Difference {
700                pos: Box::new(self.ir_node.into_inner()),
701                neg: Box::new(other.ir_node.into_inner()),
702                metadata: self.location.new_node_metadata::<T>(),
703            },
704        )
705    }
706
707    /// An operator which allows you to "inspect" each element of a stream without
708    /// modifying it. The closure `f` is called on a reference to each item. This is
709    /// mainly useful for debugging, and should not be used to generate side-effects.
710    ///
711    /// # Example
712    /// ```rust
713    /// # use hydro_lang::*;
714    /// # use futures::StreamExt;
715    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
716    /// let nums = process.source_iter(q!(vec![1, 2]));
717    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
718    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
719    /// # }, |mut stream| async move {
720    /// # for w in vec![1, 2] {
721    /// #     assert_eq!(stream.next().await.unwrap(), w);
722    /// # }
723    /// # }));
724    /// ```
725    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<T, L, B, O, R>
726    where
727        F: Fn(&T) + 'a,
728    {
729        let f = f.splice_fn1_borrow_ctx(&self.location).into();
730
731        if L::is_top_level() {
732            Stream::new(
733                self.location.clone(),
734                HydroNode::Persist {
735                    inner: Box::new(HydroNode::Inspect {
736                        f,
737                        input: Box::new(HydroNode::Unpersist {
738                            inner: Box::new(self.ir_node.into_inner()),
739                            metadata: self.location.new_node_metadata::<T>(),
740                        }),
741                        metadata: self.location.new_node_metadata::<T>(),
742                    }),
743                    metadata: self.location.new_node_metadata::<T>(),
744                },
745            )
746        } else {
747            Stream::new(
748                self.location.clone(),
749                HydroNode::Inspect {
750                    f,
751                    input: Box::new(self.ir_node.into_inner()),
752                    metadata: self.location.new_node_metadata::<T>(),
753                },
754            )
755        }
756    }
757
758    /// Explicitly "casts" the stream to a type with a different ordering
759    /// guarantee. Useful in unsafe code where the ordering cannot be proven
760    /// by the type-system.
761    ///
762    /// # Safety
763    /// This function is used as an escape hatch, and any mistakes in the
764    /// provided ordering guarantee will propagate into the guarantees
765    /// for the rest of the program.
766    ///
767    /// # Example
768    /// # TODO: more sensible code after Shadaj merges
769    /// ```rust
770    /// # use hydro_lang::*;
771    /// # use std::collections::HashSet;
772    /// # use futures::StreamExt;
773    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
774    /// let nums = process.source_iter(q!({
775    ///     let now = std::time::SystemTime::now();
776    ///     match now.elapsed().unwrap().as_secs() % 2 {
777    ///         0 => vec![5, 4, 3, 2, 1],
778    ///         _ => vec![1, 2, 3, 4, 5],
779    ///     }
780    ///     .into_iter()
781    /// }));
782    /// // despite being generated by `source_iter`, the order of `nums` across runs is non-deterministic
783    /// let stream = unsafe { nums.assume_ordering::<NoOrder>() };
784    /// stream
785    /// # }, |mut stream| async move {
786    /// # for w in vec![1, 2, 3, 4, 5] {
787    /// #     assert!((1..=5).contains(&stream.next().await.unwrap()));
788    /// # }
789    /// # }));
790    /// ```
791    pub unsafe fn assume_ordering<O2>(self) -> Stream<T, L, B, O2, R> {
792        Stream::new(self.location, self.ir_node.into_inner())
793    }
794
795    /// Explicitly "casts" the stream to a type with a different retries
796    /// guarantee. Useful in unsafe code where the lack of retries cannot
797    /// be proven by the type-system.
798    ///
799    /// # Safety
800    /// This function is used as an escape hatch, and any mistakes in the
801    /// provided retries guarantee will propagate into the guarantees
802    /// for the rest of the program.
803    pub unsafe fn assume_retries<R2>(self) -> Stream<T, L, B, O, R2> {
804        Stream::new(self.location, self.ir_node.into_inner())
805    }
806
807    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
808        unsafe {
809            // SAFETY: this is a weaker retry guarantee, so it is safe to assume
810            self.assume_retries::<AtLeastOnce>()
811        }
812    }
813}
814
815impl<'a, T, L, B, O, R> Stream<&T, L, B, O, R>
816where
817    L: Location<'a>,
818{
819    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
820    ///
821    /// # Example
822    /// ```rust
823    /// # use hydro_lang::*;
824    /// # use futures::StreamExt;
825    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
826    /// process.source_iter(q!(&[1, 2, 3])).cloned()
827    /// # }, |mut stream| async move {
828    /// // 1, 2, 3
829    /// # for w in vec![1, 2, 3] {
830    /// #     assert_eq!(stream.next().await.unwrap(), w);
831    /// # }
832    /// # }));
833    /// ```
834    pub fn cloned(self) -> Stream<T, L, B, O, R>
835    where
836        T: Clone,
837    {
838        self.map(q!(|d| d.clone()))
839    }
840}
841
842impl<'a, T, L, B, O, R> Stream<T, L, B, O, R>
843where
844    L: Location<'a>,
845    O: MinOrder<NoOrder, Min = NoOrder>,
846{
847    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
848    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
849    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
850    ///
851    /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
852    /// and there may be duplicates.
853    ///
854    /// # Example
855    /// ```rust
856    /// # use hydro_lang::*;
857    /// # use futures::StreamExt;
858    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
859    /// let tick = process.tick();
860    /// let bools = process.source_iter(q!(vec![false, true, false]));
861    /// let batch = unsafe { bools.tick_batch(&tick) };
862    /// batch
863    ///     .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
864    ///     .all_ticks()
865    /// # }, |mut stream| async move {
866    /// // true
867    /// # assert_eq!(stream.next().await.unwrap(), true);
868    /// # }));
869    /// ```
870    pub fn fold_commutative_idempotent<A, I, F>(
871        self,
872        init: impl IntoQuotedMut<'a, I, L>,
873        comb: impl IntoQuotedMut<'a, F, L>,
874    ) -> Singleton<A, L, B>
875    where
876        I: Fn() -> A + 'a,
877        F: Fn(&mut A, T),
878    {
879        unsafe {
880            // SAFETY: the combinator function is idempotent
881            self.assume_retries()
882        }
883        .fold_commutative(init, comb) // the combinator function is commutative
884    }
885
886    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
887    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
888    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
889    /// reference, so that it can be modified in place.
890    ///
891    /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
892    /// and there may be duplicates.
893    ///
894    /// # Example
895    /// ```rust
896    /// # use hydro_lang::*;
897    /// # use futures::StreamExt;
898    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
899    /// let tick = process.tick();
900    /// let bools = process.source_iter(q!(vec![false, true, false]));
901    /// let batch = unsafe { bools.tick_batch(&tick) };
902    /// batch
903    ///     .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
904    ///     .all_ticks()
905    /// # }, |mut stream| async move {
906    /// // true
907    /// # assert_eq!(stream.next().await.unwrap(), true);
908    /// # }));
909    /// ```
910    pub fn reduce_commutative_idempotent<F>(
911        self,
912        comb: impl IntoQuotedMut<'a, F, L>,
913    ) -> Optional<T, L, B>
914    where
915        F: Fn(&mut T, T) + 'a,
916    {
917        unsafe {
918            // SAFETY: the combinator function is idempotent
919            self.assume_retries()
920        }
921        .reduce_commutative(comb) // the combinator function is commutative
922    }
923
924    /// Computes the maximum element in the stream as an [`Optional`], which
925    /// will be empty until the first element in the input arrives.
926    ///
927    /// # Example
928    /// ```rust
929    /// # use hydro_lang::*;
930    /// # use futures::StreamExt;
931    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
932    /// let tick = process.tick();
933    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
934    /// let batch = unsafe { numbers.tick_batch(&tick) };
935    /// batch.max().all_ticks()
936    /// # }, |mut stream| async move {
937    /// // 4
938    /// # assert_eq!(stream.next().await.unwrap(), 4);
939    /// # }));
940    /// ```
941    pub fn max(self) -> Optional<T, L, B>
942    where
943        T: Ord,
944    {
945        self.reduce_commutative_idempotent(q!(|curr, new| {
946            if new > *curr {
947                *curr = new;
948            }
949        }))
950    }
951
952    /// Computes the minimum element in the stream as an [`Optional`], which
953    /// will be empty until the first element in the input arrives.
954    ///
955    /// # Example
956    /// ```rust
957    /// # use hydro_lang::*;
958    /// # use futures::StreamExt;
959    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
960    /// let tick = process.tick();
961    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
962    /// let batch = unsafe { numbers.tick_batch(&tick) };
963    /// batch.min().all_ticks()
964    /// # }, |mut stream| async move {
965    /// // 1
966    /// # assert_eq!(stream.next().await.unwrap(), 1);
967    /// # }));
968    /// ```
969    pub fn min(self) -> Optional<T, L, B>
970    where
971        T: Ord,
972    {
973        self.reduce_commutative_idempotent(q!(|curr, new| {
974            if new < *curr {
975                *curr = new;
976            }
977        }))
978    }
979}
980
981impl<'a, T, L, B, O> Stream<T, L, B, O, ExactlyOnce>
982where
983    L: Location<'a>,
984    O: MinOrder<NoOrder, Min = NoOrder>,
985{
986    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
987    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
988    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
989    ///
990    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
991    ///
992    /// # Example
993    /// ```rust
994    /// # use hydro_lang::*;
995    /// # use futures::StreamExt;
996    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
997    /// let tick = process.tick();
998    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
999    /// let batch = unsafe { numbers.tick_batch(&tick) };
1000    /// batch
1001    ///     .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1002    ///     .all_ticks()
1003    /// # }, |mut stream| async move {
1004    /// // 10
1005    /// # assert_eq!(stream.next().await.unwrap(), 10);
1006    /// # }));
1007    /// ```
1008    pub fn fold_commutative<A, I, F>(
1009        self,
1010        init: impl IntoQuotedMut<'a, I, L>,
1011        comb: impl IntoQuotedMut<'a, F, L>,
1012    ) -> Singleton<A, L, B>
1013    where
1014        I: Fn() -> A + 'a,
1015        F: Fn(&mut A, T),
1016    {
1017        unsafe {
1018            // SAFETY: the combinator function is commutative
1019            self.assume_ordering()
1020        }
1021        .fold(init, comb)
1022    }
1023
1024    /// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream,
1025    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1026    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1027    /// reference, so that it can be modified in place.
1028    ///
1029    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1030    ///
1031    /// # Example
1032    /// ```rust
1033    /// # use hydro_lang::*;
1034    /// # use futures::StreamExt;
1035    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1036    /// let tick = process.tick();
1037    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1038    /// let batch = unsafe { numbers.tick_batch(&tick) };
1039    /// batch
1040    ///     .reduce_commutative(q!(|curr, new| *curr += new))
1041    ///     .all_ticks()
1042    /// # }, |mut stream| async move {
1043    /// // 10
1044    /// # assert_eq!(stream.next().await.unwrap(), 10);
1045    /// # }));
1046    /// ```
1047    pub fn reduce_commutative<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1048    where
1049        F: Fn(&mut T, T) + 'a,
1050    {
1051        unsafe {
1052            // SAFETY: the combinator function is commutative
1053            self.assume_ordering()
1054        }
1055        .reduce(comb)
1056    }
1057
1058    /// Computes the maximum element in the stream as an [`Optional`], where the
1059    /// maximum is determined according to the `key` function. The [`Optional`] will
1060    /// be empty until the first element in the input arrives.
1061    ///
1062    /// # Example
1063    /// ```rust
1064    /// # use hydro_lang::*;
1065    /// # use futures::StreamExt;
1066    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1067    /// let tick = process.tick();
1068    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1069    /// let batch = unsafe { numbers.tick_batch(&tick) };
1070    /// batch.max_by_key(q!(|x| -x)).all_ticks()
1071    /// # }, |mut stream| async move {
1072    /// // 1
1073    /// # assert_eq!(stream.next().await.unwrap(), 1);
1074    /// # }));
1075    /// ```
1076    pub fn max_by_key<K, F>(self, key: impl IntoQuotedMut<'a, F, L> + Copy) -> Optional<T, L, B>
1077    where
1078        K: Ord,
1079        F: Fn(&T) -> K + 'a,
1080    {
1081        let f = key.splice_fn1_borrow_ctx(&self.location);
1082
1083        let wrapped: syn::Expr = parse_quote!({
1084            let key_fn = #f;
1085            move |curr, new| {
1086                if key_fn(&new) > key_fn(&*curr) {
1087                    *curr = new;
1088                }
1089            }
1090        });
1091
1092        let mut core = HydroNode::Reduce {
1093            f: wrapped.into(),
1094            input: Box::new(self.ir_node.into_inner()),
1095            metadata: self.location.new_node_metadata::<T>(),
1096        };
1097
1098        if L::is_top_level() {
1099            core = HydroNode::Persist {
1100                inner: Box::new(core),
1101                metadata: self.location.new_node_metadata::<T>(),
1102            };
1103        }
1104
1105        Optional::new(self.location, core)
1106    }
1107
1108    /// Computes the number of elements in the stream as a [`Singleton`].
1109    ///
1110    /// # Example
1111    /// ```rust
1112    /// # use hydro_lang::*;
1113    /// # use futures::StreamExt;
1114    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1115    /// let tick = process.tick();
1116    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1117    /// let batch = unsafe { numbers.tick_batch(&tick) };
1118    /// batch.count().all_ticks()
1119    /// # }, |mut stream| async move {
1120    /// // 4
1121    /// # assert_eq!(stream.next().await.unwrap(), 4);
1122    /// # }));
1123    /// ```
1124    pub fn count(self) -> Singleton<usize, L, B> {
1125        self.fold_commutative(q!(|| 0usize), q!(|count, _| *count += 1))
1126    }
1127}
1128
1129impl<'a, T, L, B> Stream<T, L, B, TotalOrder, ExactlyOnce>
1130where
1131    L: Location<'a>,
1132{
1133    /// Returns a stream with the current count tupled with each element in the input stream.
1134    ///
1135    /// # Example
1136    /// ```rust
1137    /// # use hydro_lang::{*, stream::ExactlyOnce};
1138    /// # use futures::StreamExt;
1139    /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, TotalOrder, ExactlyOnce>(|process| {
1140    /// let tick = process.tick();
1141    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1142    /// numbers.enumerate()
1143    /// # }, |mut stream| async move {
1144    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1145    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1146    /// #     assert_eq!(stream.next().await.unwrap(), w);
1147    /// # }
1148    /// # }));
1149    /// ```
1150    pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder, ExactlyOnce> {
1151        if L::is_top_level() {
1152            Stream::new(
1153                self.location.clone(),
1154                HydroNode::Persist {
1155                    inner: Box::new(HydroNode::Enumerate {
1156                        is_static: true,
1157                        input: Box::new(HydroNode::Unpersist {
1158                            inner: Box::new(self.ir_node.into_inner()),
1159                            metadata: self.location.new_node_metadata::<T>(),
1160                        }),
1161                        metadata: self.location.new_node_metadata::<(usize, T)>(),
1162                    }),
1163                    metadata: self.location.new_node_metadata::<(usize, T)>(),
1164                },
1165            )
1166        } else {
1167            Stream::new(
1168                self.location.clone(),
1169                HydroNode::Enumerate {
1170                    is_static: false,
1171                    input: Box::new(self.ir_node.into_inner()),
1172                    metadata: self.location.new_node_metadata::<(usize, T)>(),
1173                },
1174            )
1175        }
1176    }
1177
1178    /// Computes the first element in the stream as an [`Optional`], which
1179    /// will be empty until the first element in the input arrives.
1180    ///
1181    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1182    /// re-ordering of elements may cause the first element to change.
1183    ///
1184    /// # Example
1185    /// ```rust
1186    /// # use hydro_lang::*;
1187    /// # use futures::StreamExt;
1188    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1189    /// let tick = process.tick();
1190    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1191    /// let batch = unsafe { numbers.tick_batch(&tick) };
1192    /// batch.first().all_ticks()
1193    /// # }, |mut stream| async move {
1194    /// // 1
1195    /// # assert_eq!(stream.next().await.unwrap(), 1);
1196    /// # }));
1197    /// ```
1198    pub fn first(self) -> Optional<T, L, B> {
1199        Optional::new(self.location, self.ir_node.into_inner())
1200    }
1201
1202    /// Computes the last element in the stream as an [`Optional`], which
1203    /// will be empty until an element in the input arrives.
1204    ///
1205    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1206    /// re-ordering of elements may cause the last element to change.
1207    ///
1208    /// # Example
1209    /// ```rust
1210    /// # use hydro_lang::*;
1211    /// # use futures::StreamExt;
1212    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1213    /// let tick = process.tick();
1214    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1215    /// let batch = unsafe { numbers.tick_batch(&tick) };
1216    /// batch.last().all_ticks()
1217    /// # }, |mut stream| async move {
1218    /// // 4
1219    /// # assert_eq!(stream.next().await.unwrap(), 4);
1220    /// # }));
1221    /// ```
1222    pub fn last(self) -> Optional<T, L, B> {
1223        self.reduce(q!(|curr, new| *curr = new))
1224    }
1225
1226    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1227    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1228    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1229    ///
1230    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1231    /// to depend on the order of elements in the stream.
1232    ///
1233    /// # Example
1234    /// ```rust
1235    /// # use hydro_lang::*;
1236    /// # use futures::StreamExt;
1237    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1238    /// let tick = process.tick();
1239    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1240    /// let batch = unsafe { words.tick_batch(&tick) };
1241    /// batch
1242    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1243    ///     .all_ticks()
1244    /// # }, |mut stream| async move {
1245    /// // "HELLOWORLD"
1246    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1247    /// # }));
1248    /// ```
1249    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
1250        self,
1251        init: impl IntoQuotedMut<'a, I, L>,
1252        comb: impl IntoQuotedMut<'a, F, L>,
1253    ) -> Singleton<A, L, B> {
1254        let init = init.splice_fn0_ctx(&self.location).into();
1255        let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1256
1257        let mut core = HydroNode::Fold {
1258            init,
1259            acc: comb,
1260            input: Box::new(self.ir_node.into_inner()),
1261            metadata: self.location.new_node_metadata::<A>(),
1262        };
1263
1264        if L::is_top_level() {
1265            // top-level (possibly unbounded) singletons are represented as
1266            // a stream which produces all values from all ticks every tick,
1267            // so Unpersist will always give the lastest aggregation
1268            core = HydroNode::Persist {
1269                inner: Box::new(core),
1270                metadata: self.location.new_node_metadata::<A>(),
1271            };
1272        }
1273
1274        Singleton::new(self.location, core)
1275    }
1276
1277    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1278    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1279    /// until the first element in the input arrives.
1280    ///
1281    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1282    /// to depend on the order of elements in the stream.
1283    ///
1284    /// # Example
1285    /// ```rust
1286    /// # use hydro_lang::*;
1287    /// # use futures::StreamExt;
1288    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1289    /// let tick = process.tick();
1290    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1291    /// let batch = unsafe { words.tick_batch(&tick) };
1292    /// batch
1293    ///     .map(q!(|x| x.to_string()))
1294    ///     .reduce(q!(|curr, new| curr.push_str(&new)))
1295    ///     .all_ticks()
1296    /// # }, |mut stream| async move {
1297    /// // "HELLOWORLD"
1298    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1299    /// # }));
1300    /// ```
1301    pub fn reduce<F: Fn(&mut T, T) + 'a>(
1302        self,
1303        comb: impl IntoQuotedMut<'a, F, L>,
1304    ) -> Optional<T, L, B> {
1305        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1306        let mut core = HydroNode::Reduce {
1307            f,
1308            input: Box::new(self.ir_node.into_inner()),
1309            metadata: self.location.new_node_metadata::<T>(),
1310        };
1311
1312        if L::is_top_level() {
1313            core = HydroNode::Persist {
1314                inner: Box::new(core),
1315                metadata: self.location.new_node_metadata::<T>(),
1316            };
1317        }
1318
1319        Optional::new(self.location, core)
1320    }
1321}
1322
1323impl<'a, T, L: Location<'a> + NoTick + NoAtomic, O, R> Stream<T, L, Unbounded, O, R> {
1324    /// Produces a new stream that interleaves the elements of the two input streams.
1325    /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1326    ///
1327    /// Currently, both input streams must be [`Unbounded`]. When the streams are
1328    /// [`Bounded`], you can use [`Stream::chain`] instead.
1329    ///
1330    /// # Example
1331    /// ```rust
1332    /// # use hydro_lang::*;
1333    /// # use futures::StreamExt;
1334    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1335    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1336    /// numbers.clone().map(q!(|x| x + 1)).union(numbers)
1337    /// # }, |mut stream| async move {
1338    /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1339    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1340    /// #     assert_eq!(stream.next().await.unwrap(), w);
1341    /// # }
1342    /// # }));
1343    /// ```
1344    pub fn union<O2, R2: MinRetries<R>>(
1345        self,
1346        other: Stream<T, L, Unbounded, O2, R2>,
1347    ) -> Stream<T, L, Unbounded, NoOrder, R2::Min> {
1348        let tick = self.location.tick();
1349        unsafe {
1350            // SAFETY: Because the outputs are unordered,
1351            // we can interleave batches from both streams.
1352            self.tick_batch(&tick)
1353                .assume_ordering::<NoOrder>()
1354                .assume_retries::<R2::Min>()
1355                .chain(
1356                    other
1357                        .tick_batch(&tick)
1358                        .assume_ordering::<NoOrder>()
1359                        .assume_retries::<R2::Min>(),
1360                )
1361                .all_ticks()
1362                .assume_ordering()
1363        }
1364    }
1365}
1366
1367impl<'a, T, L, O, R> Stream<T, L, Bounded, O, R>
1368where
1369    L: Location<'a>,
1370{
1371    /// Produces a new stream that emits the input elements in sorted order.
1372    ///
1373    /// The input stream can have any ordering guarantee, but the output stream
1374    /// will have a [`TotalOrder`] guarantee. This operator will block until all
1375    /// elements in the input stream are available, so it requires the input stream
1376    /// to be [`Bounded`].
1377    ///
1378    /// # Example
1379    /// ```rust
1380    /// # use hydro_lang::*;
1381    /// # use futures::StreamExt;
1382    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1383    /// let tick = process.tick();
1384    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1385    /// let batch = unsafe { numbers.tick_batch(&tick) };
1386    /// batch.sort().all_ticks()
1387    /// # }, |mut stream| async move {
1388    /// // 1, 2, 3, 4
1389    /// # for w in (1..5) {
1390    /// #     assert_eq!(stream.next().await.unwrap(), w);
1391    /// # }
1392    /// # }));
1393    /// ```
1394    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1395    where
1396        T: Ord,
1397    {
1398        Stream::new(
1399            self.location.clone(),
1400            HydroNode::Sort {
1401                input: Box::new(self.ir_node.into_inner()),
1402                metadata: self.location.new_node_metadata::<T>(),
1403            },
1404        )
1405    }
1406
1407    /// Produces a new stream that first emits the elements of the `self` stream,
1408    /// and then emits the elements of the `other` stream. The output stream has
1409    /// a [`TotalOrder`] guarantee if and only if both input streams have a
1410    /// [`TotalOrder`] guarantee.
1411    ///
1412    /// Currently, both input streams must be [`Bounded`]. This operator will block
1413    /// on the first stream until all its elements are available. In a future version,
1414    /// we will relax the requirement on the `other` stream.
1415    ///
1416    /// # Example
1417    /// ```rust
1418    /// # use hydro_lang::*;
1419    /// # use futures::StreamExt;
1420    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1421    /// let tick = process.tick();
1422    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1423    /// let batch = unsafe { numbers.tick_batch(&tick) };
1424    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1425    /// # }, |mut stream| async move {
1426    /// // 2, 3, 4, 5, 1, 2, 3, 4
1427    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1428    /// #     assert_eq!(stream.next().await.unwrap(), w);
1429    /// # }
1430    /// # }));
1431    /// ```
1432    pub fn chain<O2>(self, other: Stream<T, L, Bounded, O2, R>) -> Stream<T, L, Bounded, O::Min, R>
1433    where
1434        O: MinOrder<O2>,
1435    {
1436        check_matching_location(&self.location, &other.location);
1437
1438        Stream::new(
1439            self.location.clone(),
1440            HydroNode::Chain {
1441                first: Box::new(self.ir_node.into_inner()),
1442                second: Box::new(other.ir_node.into_inner()),
1443                metadata: self.location.new_node_metadata::<T>(),
1444            },
1445        )
1446    }
1447}
1448
1449impl<'a, K, V1, L, B, O, R> Stream<(K, V1), L, B, O, R>
1450where
1451    L: Location<'a>,
1452{
1453    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
1454    /// by equi-joining the two streams on the key attribute `K`.
1455    ///
1456    /// # Example
1457    /// ```rust
1458    /// # use hydro_lang::*;
1459    /// # use std::collections::HashSet;
1460    /// # use futures::StreamExt;
1461    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1462    /// let tick = process.tick();
1463    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
1464    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
1465    /// stream1.join(stream2)
1466    /// # }, |mut stream| async move {
1467    /// // (1, ('a', 'x')), (2, ('b', 'y'))
1468    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
1469    /// # stream.map(|i| assert!(expected.contains(&i)));
1470    /// # }));
1471    pub fn join<V2, O2>(
1472        self,
1473        n: Stream<(K, V2), L, B, O2, R>,
1474    ) -> Stream<(K, (V1, V2)), L, B, NoOrder, R>
1475    where
1476        K: Eq + Hash,
1477    {
1478        check_matching_location(&self.location, &n.location);
1479
1480        Stream::new(
1481            self.location.clone(),
1482            HydroNode::Join {
1483                left: Box::new(self.ir_node.into_inner()),
1484                right: Box::new(n.ir_node.into_inner()),
1485                metadata: self.location.new_node_metadata::<(K, (V1, V2))>(),
1486            },
1487        )
1488    }
1489
1490    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
1491    /// computes the anti-join of the items in the input -- i.e. returns
1492    /// unique items in the first input that do not have a matching key
1493    /// in the second input.
1494    ///
1495    /// # Example
1496    /// ```rust
1497    /// # use hydro_lang::*;
1498    /// # use futures::StreamExt;
1499    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1500    /// let tick = process.tick();
1501    /// let stream = unsafe {
1502    ///    process
1503    ///    .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1504    ///    .tick_batch(&tick)
1505    /// };
1506    /// let batch = unsafe {
1507    ///     process
1508    ///         .source_iter(q!(vec![1, 2]))
1509    ///         .tick_batch(&tick)
1510    /// };
1511    /// stream.anti_join(batch).all_ticks()
1512    /// # }, |mut stream| async move {
1513    /// # for w in vec![(3, 'c'), (4, 'd')] {
1514    /// #     assert_eq!(stream.next().await.unwrap(), w);
1515    /// # }
1516    /// # }));
1517    pub fn anti_join<O2>(self, n: Stream<K, L, Bounded, O2, R>) -> Stream<(K, V1), L, B, O, R>
1518    where
1519        K: Eq + Hash,
1520    {
1521        check_matching_location(&self.location, &n.location);
1522
1523        Stream::new(
1524            self.location.clone(),
1525            HydroNode::AntiJoin {
1526                pos: Box::new(self.ir_node.into_inner()),
1527                neg: Box::new(n.ir_node.into_inner()),
1528                metadata: self.location.new_node_metadata::<(K, V1)>(),
1529            },
1530        )
1531    }
1532}
1533
1534impl<'a, K, V, L> Stream<(K, V), Tick<L>, Bounded, TotalOrder, ExactlyOnce>
1535where
1536    K: Eq + Hash,
1537    L: Location<'a>,
1538{
1539    /// A special case of [`Stream::fold`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1540    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1541    /// in the second element are accumulated via the `comb` closure.
1542    ///
1543    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1544    /// to depend on the order of elements in the stream.
1545    ///
1546    /// If the input and output value types are the same and do not require initialization then use
1547    /// [`Stream::reduce_keyed`].
1548    ///
1549    /// # Example
1550    /// ```rust
1551    /// # use hydro_lang::*;
1552    /// # use futures::StreamExt;
1553    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1554    /// let tick = process.tick();
1555    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1556    /// let batch = unsafe { numbers.tick_batch(&tick) };
1557    /// batch
1558    ///     .fold_keyed(q!(|| 0), q!(|acc, x| *acc += x))
1559    ///     .all_ticks()
1560    /// # }, |mut stream| async move {
1561    /// // (1, 5), (2, 7)
1562    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1563    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1564    /// # }));
1565    /// ```
1566    pub fn fold_keyed<A, I, F>(
1567        self,
1568        init: impl IntoQuotedMut<'a, I, Tick<L>>,
1569        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1570    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
1571    where
1572        I: Fn() -> A + 'a,
1573        F: Fn(&mut A, V) + 'a,
1574    {
1575        let init = init.splice_fn0_ctx(&self.location).into();
1576        let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1577
1578        Stream::new(
1579            self.location.clone(),
1580            HydroNode::FoldKeyed {
1581                init,
1582                acc: comb,
1583                input: Box::new(self.ir_node.into_inner()),
1584                metadata: self.location.new_node_metadata::<(K, A)>(),
1585            },
1586        )
1587    }
1588
1589    /// A special case of [`Stream::reduce`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1590    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1591    /// in the second element are accumulated via the `comb` closure.
1592    ///
1593    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1594    /// to depend on the order of elements in the stream.
1595    ///
1596    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed`].
1597    ///
1598    /// # Example
1599    /// ```rust
1600    /// # use hydro_lang::*;
1601    /// # use futures::StreamExt;
1602    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1603    /// let tick = process.tick();
1604    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1605    /// let batch = unsafe { numbers.tick_batch(&tick) };
1606    /// batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks()
1607    /// # }, |mut stream| async move {
1608    /// // (1, 5), (2, 7)
1609    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1610    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1611    /// # }));
1612    /// ```
1613    pub fn reduce_keyed<F>(
1614        self,
1615        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1616    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
1617    where
1618        F: Fn(&mut V, V) + 'a,
1619    {
1620        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1621
1622        Stream::new(
1623            self.location.clone(),
1624            HydroNode::ReduceKeyed {
1625                f,
1626                input: Box::new(self.ir_node.into_inner()),
1627                metadata: self.location.new_node_metadata::<(K, V)>(),
1628            },
1629        )
1630    }
1631}
1632
1633impl<'a, K, V, L, O> Stream<(K, V), Tick<L>, Bounded, O, ExactlyOnce>
1634where
1635    K: Eq + Hash,
1636    L: Location<'a>,
1637{
1638    /// A special case of [`Stream::fold_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1639    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1640    /// in the second element are accumulated via the `comb` closure.
1641    ///
1642    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1643    ///
1644    /// If the input and output value types are the same and do not require initialization then use
1645    /// [`Stream::reduce_keyed_commutative`].
1646    ///
1647    /// # Example
1648    /// ```rust
1649    /// # use hydro_lang::*;
1650    /// # use futures::StreamExt;
1651    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1652    /// let tick = process.tick();
1653    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1654    /// let batch = unsafe { numbers.tick_batch(&tick) };
1655    /// batch
1656    ///     .fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1657    ///     .all_ticks()
1658    /// # }, |mut stream| async move {
1659    /// // (1, 5), (2, 7)
1660    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1661    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1662    /// # }));
1663    /// ```
1664    pub fn fold_keyed_commutative<A, I, F>(
1665        self,
1666        init: impl IntoQuotedMut<'a, I, Tick<L>>,
1667        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1668    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
1669    where
1670        I: Fn() -> A + 'a,
1671        F: Fn(&mut A, V) + 'a,
1672    {
1673        let init = init.splice_fn0_ctx(&self.location).into();
1674        let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1675
1676        Stream::new(
1677            self.location.clone(),
1678            HydroNode::FoldKeyed {
1679                init,
1680                acc: comb,
1681                input: Box::new(self.ir_node.into_inner()),
1682                metadata: self.location.new_node_metadata::<(K, A)>(),
1683            },
1684        )
1685    }
1686
1687    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
1688    /// # Example
1689    /// ```rust
1690    /// # use hydro_lang::*;
1691    /// # use futures::StreamExt;
1692    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1693    /// let tick = process.tick();
1694    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1695    /// let batch = unsafe { numbers.tick_batch(&tick) };
1696    /// batch.keys().all_ticks()
1697    /// # }, |mut stream| async move {
1698    /// // 1, 2
1699    /// # assert_eq!(stream.next().await.unwrap(), 1);
1700    /// # assert_eq!(stream.next().await.unwrap(), 2);
1701    /// # }));
1702    /// ```
1703    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
1704        self.fold_keyed_commutative(q!(|| ()), q!(|_, _| {}))
1705            .map(q!(|(k, _)| k))
1706    }
1707
1708    /// A special case of [`Stream::reduce_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1709    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1710    /// in the second element are accumulated via the `comb` closure.
1711    ///
1712    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1713    ///
1714    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative`].
1715    ///
1716    /// # Example
1717    /// ```rust
1718    /// # use hydro_lang::*;
1719    /// # use futures::StreamExt;
1720    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1721    /// let tick = process.tick();
1722    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1723    /// let batch = unsafe { numbers.tick_batch(&tick) };
1724    /// batch
1725    ///     .reduce_keyed_commutative(q!(|acc, x| *acc += x))
1726    ///     .all_ticks()
1727    /// # }, |mut stream| async move {
1728    /// // (1, 5), (2, 7)
1729    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1730    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1731    /// # }));
1732    /// ```
1733    pub fn reduce_keyed_commutative<F>(
1734        self,
1735        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1736    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
1737    where
1738        F: Fn(&mut V, V) + 'a,
1739    {
1740        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1741
1742        Stream::new(
1743            self.location.clone(),
1744            HydroNode::ReduceKeyed {
1745                f,
1746                input: Box::new(self.ir_node.into_inner()),
1747                metadata: self.location.new_node_metadata::<(K, V)>(),
1748            },
1749        )
1750    }
1751}
1752
1753impl<'a, T, L, B, O, R> Stream<T, Atomic<L>, B, O, R>
1754where
1755    L: Location<'a> + NoTick,
1756{
1757    /// Returns a stream corresponding to the latest batch of elements being atomically
1758    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
1759    /// the order of the input.
1760    ///
1761    /// # Safety
1762    /// The batch boundaries are non-deterministic and may change across executions.
1763    pub unsafe fn tick_batch(self) -> Stream<T, Tick<L>, Bounded, O, R> {
1764        Stream::new(
1765            self.location.clone().tick,
1766            HydroNode::Unpersist {
1767                inner: Box::new(self.ir_node.into_inner()),
1768                metadata: self.location.new_node_metadata::<T>(),
1769            },
1770        )
1771    }
1772
1773    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
1774        Stream::new(self.location.tick.l, self.ir_node.into_inner())
1775    }
1776
1777    pub fn atomic_source(&self) -> Tick<L> {
1778        self.location.tick.clone()
1779    }
1780}
1781
1782impl<'a, T, L, B, O, R> Stream<T, L, B, O, R>
1783where
1784    L: Location<'a> + NoTick + NoAtomic,
1785{
1786    pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
1787        Stream::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
1788    }
1789
1790    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
1791    /// Future outputs are produced as available, regardless of input arrival order.
1792    ///
1793    /// # Example
1794    /// ```rust
1795    /// # use std::collections::HashSet;
1796    /// # use futures::StreamExt;
1797    /// # use hydro_lang::*;
1798    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1799    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
1800    ///     .map(q!(|x| async move {
1801    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1802    ///         x
1803    ///     }))
1804    ///     .resolve_futures()
1805    /// #   },
1806    /// #   |mut stream| async move {
1807    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
1808    /// #       let mut output = HashSet::new();
1809    /// #       for _ in 1..10 {
1810    /// #           output.insert(stream.next().await.unwrap());
1811    /// #       }
1812    /// #       assert_eq!(
1813    /// #           output,
1814    /// #           HashSet::<i32>::from_iter(1..10)
1815    /// #       );
1816    /// #   },
1817    /// # ));
1818    pub fn resolve_futures<T2>(self) -> Stream<T2, L, B, NoOrder, R>
1819    where
1820        T: Future<Output = T2>,
1821    {
1822        Stream::new(
1823            self.location.clone(),
1824            HydroNode::ResolveFutures {
1825                input: Box::new(self.ir_node.into_inner()),
1826                metadata: self.location.new_node_metadata::<T2>(),
1827            },
1828        )
1829    }
1830
1831    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1832    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1833    /// the order of the input.
1834    ///
1835    /// # Safety
1836    /// The batch boundaries are non-deterministic and may change across executions.
1837    pub unsafe fn tick_batch(self, tick: &Tick<L>) -> Stream<T, Tick<L>, Bounded, O, R> {
1838        unsafe { self.atomic(tick).tick_batch() }
1839    }
1840
1841    /// Given a time interval, returns a stream corresponding to samples taken from the
1842    /// stream roughly at that interval. The output will have elements in the same order
1843    /// as the input, but with arbitrary elements skipped between samples. There is also
1844    /// no guarantee on the exact timing of the samples.
1845    ///
1846    /// # Safety
1847    /// The output stream is non-deterministic in which elements are sampled, since this
1848    /// is controlled by a clock.
1849    pub unsafe fn sample_every(
1850        self,
1851        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1852    ) -> Stream<T, L, Unbounded, O, AtLeastOnce> {
1853        let samples = unsafe {
1854            // SAFETY: source of intentional non-determinism
1855            self.location.source_interval(interval)
1856        };
1857
1858        let tick = self.location.tick();
1859        unsafe {
1860            // SAFETY: source of intentional non-determinism
1861            self.tick_batch(&tick)
1862                .continue_if(samples.tick_batch(&tick).first())
1863                .all_ticks()
1864                .weakest_retries()
1865        }
1866    }
1867
1868    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
1869    /// stream has not emitted a value since that duration.
1870    ///
1871    /// # Safety
1872    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1873    /// samples take place, timeouts may be non-deterministically generated or missed,
1874    /// and the notification of the timeout may be delayed as well. There is also no
1875    /// guarantee on how long the [`Optional`] will have a value after the timeout is
1876    /// detected based on when the next sample is taken.
1877    pub unsafe fn timeout(
1878        self,
1879        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
1880    ) -> Optional<(), L, Unbounded>
1881    where
1882        O: MinOrder<NoOrder, Min = NoOrder>,
1883    {
1884        let tick = self.location.tick();
1885
1886        let latest_received = unsafe { self.assume_retries() }.fold_commutative(
1887            q!(|| None),
1888            q!(|latest, _| {
1889                *latest = Some(Instant::now());
1890            }),
1891        );
1892
1893        unsafe {
1894            // SAFETY: Non-deterministic delay in detecting a timeout is expected.
1895            latest_received.latest_tick(&tick)
1896        }
1897        .filter_map(q!(move |latest_received| {
1898            if let Some(latest_received) = latest_received {
1899                if Instant::now().duration_since(latest_received) > duration {
1900                    Some(())
1901                } else {
1902                    None
1903                }
1904            } else {
1905                Some(())
1906            }
1907        }))
1908        .latest()
1909    }
1910}
1911
1912impl<'a, F, T, L, B, O, R> Stream<F, L, B, O, R>
1913where
1914    L: Location<'a> + NoTick + NoAtomic,
1915    F: Future<Output = T>,
1916{
1917    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
1918    /// Future outputs are produced in the same order as the input stream.
1919    ///
1920    /// # Example
1921    /// ```rust
1922    /// # use std::collections::HashSet;
1923    /// # use futures::StreamExt;
1924    /// # use hydro_lang::*;
1925    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1926    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
1927    ///     .map(q!(|x| async move {
1928    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1929    ///         x
1930    ///     }))
1931    ///     .resolve_futures_ordered()
1932    /// #   },
1933    /// #   |mut stream| async move {
1934    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
1935    /// #       let mut output = Vec::new();
1936    /// #       for _ in 1..10 {
1937    /// #           output.push(stream.next().await.unwrap());
1938    /// #       }
1939    /// #       assert_eq!(
1940    /// #           output,
1941    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
1942    /// #       );
1943    /// #   },
1944    /// # ));
1945    pub fn resolve_futures_ordered(self) -> Stream<T, L, B, O, R> {
1946        Stream::new(
1947            self.location.clone(),
1948            HydroNode::ResolveFuturesOrdered {
1949                input: Box::new(self.ir_node.into_inner()),
1950                metadata: self.location.new_node_metadata::<T>(),
1951            },
1952        )
1953    }
1954}
1955
1956impl<'a, T, L, B, O, R> Stream<T, L, B, O, R>
1957where
1958    L: Location<'a> + NoTick,
1959{
1960    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
1961        let f = f.splice_fn1_ctx(&self.location).into();
1962        let metadata = self.location.new_node_metadata::<T>();
1963        self.location
1964            .flow_state()
1965            .borrow_mut()
1966            .leaves
1967            .as_mut()
1968            .expect(FLOW_USED_MESSAGE)
1969            .push(HydroLeaf::ForEach {
1970                input: Box::new(HydroNode::Unpersist {
1971                    inner: Box::new(self.ir_node.into_inner()),
1972                    metadata: metadata.clone(),
1973                }),
1974                f,
1975                metadata,
1976            });
1977    }
1978
1979    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1980    where
1981        S: 'a + futures::Sink<T> + Unpin,
1982    {
1983        self.location
1984            .flow_state()
1985            .borrow_mut()
1986            .leaves
1987            .as_mut()
1988            .expect(FLOW_USED_MESSAGE)
1989            .push(HydroLeaf::DestSink {
1990                sink: sink.splice_typed_ctx(&self.location).into(),
1991                input: Box::new(self.ir_node.into_inner()),
1992                metadata: self.location.new_node_metadata::<T>(),
1993            });
1994    }
1995}
1996
1997impl<'a, T, L, O, R> Stream<T, Tick<L>, Bounded, O, R>
1998where
1999    L: Location<'a>,
2000{
2001    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2002        Stream::new(
2003            self.location.outer().clone(),
2004            HydroNode::Persist {
2005                inner: Box::new(self.ir_node.into_inner()),
2006                metadata: self.location.new_node_metadata::<T>(),
2007            },
2008        )
2009    }
2010
2011    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2012        Stream::new(
2013            Atomic {
2014                tick: self.location.clone(),
2015            },
2016            HydroNode::Persist {
2017                inner: Box::new(self.ir_node.into_inner()),
2018                metadata: self.location.new_node_metadata::<T>(),
2019            },
2020        )
2021    }
2022
2023    pub fn persist(self) -> Stream<T, Tick<L>, Bounded, O, R>
2024    where
2025        T: Clone,
2026    {
2027        Stream::new(
2028            self.location.clone(),
2029            HydroNode::Persist {
2030                inner: Box::new(self.ir_node.into_inner()),
2031                metadata: self.location.new_node_metadata::<T>(),
2032            },
2033        )
2034    }
2035
2036    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2037        Stream::new(
2038            self.location.clone(),
2039            HydroNode::DeferTick {
2040                input: Box::new(self.ir_node.into_inner()),
2041                metadata: self.location.new_node_metadata::<T>(),
2042            },
2043        )
2044    }
2045
2046    pub fn delta(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2047        Stream::new(
2048            self.location.clone(),
2049            HydroNode::Delta {
2050                inner: Box::new(self.ir_node.into_inner()),
2051                metadata: self.location.new_node_metadata::<T>(),
2052            },
2053        )
2054    }
2055}
2056
2057pub fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
2058    let root = get_this_crate();
2059
2060    if is_demux {
2061        parse_quote! {
2062            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::ClusterId<_>, #t_type), _>(
2063                |(id, data)| {
2064                    (id.raw_id, #root::runtime_support::bincode::serialize(&data).unwrap().into())
2065                }
2066            )
2067        }
2068    } else {
2069        parse_quote! {
2070            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
2071                |data| {
2072                    #root::runtime_support::bincode::serialize(&data).unwrap().into()
2073                }
2074            )
2075        }
2076    }
2077}
2078
2079fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
2080    serialize_bincode_with_type(is_demux, &stageleft::quote_type::<T>())
2081}
2082
2083pub fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
2084    let root = get_this_crate();
2085
2086    if let Some(c_type) = tagged {
2087        parse_quote! {
2088            |res| {
2089                let (id, b) = res.unwrap();
2090                (#root::ClusterId::<#c_type>::from_raw(id), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
2091            }
2092        }
2093    } else {
2094        parse_quote! {
2095            |res| {
2096                #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
2097            }
2098        }
2099    }
2100}
2101
2102pub(super) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
2103    deserialize_bincode_with_type(tagged, &stageleft::quote_type::<T>())
2104}
2105
2106impl<'a, T, L, B, O, R> Stream<T, L, B, O, R>
2107where
2108    L: Location<'a> + NoTick,
2109{
2110    #[expect(
2111        clippy::type_complexity,
2112        reason = "Complex signatures for CanSend trait"
2113    )]
2114    pub fn send_bincode<L2, CoreType>(
2115        self,
2116        other: &L2,
2117    ) -> Stream<<L::Root as CanSend<'a, L2>>::Out<CoreType>, L2, Unbounded, O::Min, R>
2118    where
2119        L::Root: CanSend<'a, L2, In<CoreType> = T>,
2120        L2: Location<'a>,
2121        CoreType: Serialize + DeserializeOwned,
2122        O: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<O>>,
2123    {
2124        let serialize_pipeline = Some(serialize_bincode::<CoreType>(L::Root::is_demux()));
2125
2126        let deserialize_pipeline = Some(deserialize_bincode::<CoreType>(
2127            L::Root::tagged_type().as_ref(),
2128        ));
2129
2130        Stream::new(
2131            other.clone(),
2132            HydroNode::Network {
2133                from_key: None,
2134                to_location: other.id(),
2135                to_key: None,
2136                serialize_fn: serialize_pipeline.map(|e| e.into()),
2137                instantiate_fn: DebugInstantiate::Building,
2138                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
2139                input: Box::new(self.ir_node.into_inner()),
2140                metadata: other.new_node_metadata::<CoreType>(),
2141            },
2142        )
2143    }
2144
2145    pub fn send_bincode_external<L2, CoreType>(
2146        self,
2147        other: &ExternalProcess<L2>,
2148    ) -> ExternalBincodeStream<L::Out<CoreType>>
2149    where
2150        L: CanSend<'a, ExternalProcess<'a, L2>, In<CoreType> = T, Out<CoreType> = CoreType>,
2151        L2: 'a,
2152        CoreType: Serialize + DeserializeOwned,
2153        // for now, we restirct Out<CoreType> to be CoreType, which means no tagged cluster -> external
2154    {
2155        let serialize_pipeline = Some(serialize_bincode::<CoreType>(L::is_demux()));
2156
2157        let metadata = other.new_node_metadata::<CoreType>();
2158
2159        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
2160
2161        let external_key = flow_state_borrow.next_external_out;
2162        flow_state_borrow.next_external_out += 1;
2163
2164        let leaves = flow_state_borrow.leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled()");
2165
2166        let dummy_f: syn::Expr = syn::parse_quote!(());
2167
2168        leaves.push(HydroLeaf::ForEach {
2169            f: dummy_f.into(),
2170            input: Box::new(HydroNode::Network {
2171                from_key: None,
2172                to_location: other.id(),
2173                to_key: Some(external_key),
2174                serialize_fn: serialize_pipeline.map(|e| e.into()),
2175                instantiate_fn: DebugInstantiate::Building,
2176                deserialize_fn: None,
2177                input: Box::new(self.ir_node.into_inner()),
2178                metadata: metadata.clone(),
2179            }),
2180            metadata,
2181        });
2182
2183        ExternalBincodeStream {
2184            process_id: other.id,
2185            port_id: external_key,
2186            _phantom: PhantomData,
2187        }
2188    }
2189
2190    #[expect(
2191        clippy::type_complexity,
2192        reason = "Complex signatures for CanSend trait"
2193    )]
2194    pub fn send_bytes<L2>(
2195        self,
2196        other: &L2,
2197    ) -> Stream<<L::Root as CanSend<'a, L2>>::Out<Bytes>, L2, Unbounded, O::Min, R>
2198    where
2199        L2: Location<'a>,
2200        L::Root: CanSend<'a, L2, In<Bytes> = T>,
2201        O: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<O>>,
2202    {
2203        let root = get_this_crate();
2204        Stream::new(
2205            other.clone(),
2206            HydroNode::Network {
2207                from_key: None,
2208                to_location: other.id(),
2209                to_key: None,
2210                serialize_fn: None,
2211                instantiate_fn: DebugInstantiate::Building,
2212                deserialize_fn: if let Some(c_type) = L::Root::tagged_type() {
2213                    let expr: syn::Expr = parse_quote!(|(id, b)| (#root::ClusterId<#c_type>::from_raw(id), b.unwrap().freeze()));
2214                    Some(expr.into())
2215                } else {
2216                    let expr: syn::Expr = parse_quote!(|b| b.unwrap().freeze());
2217                    Some(expr.into())
2218                },
2219                input: Box::new(self.ir_node.into_inner()),
2220                metadata: other.new_node_metadata::<Bytes>(),
2221            },
2222        )
2223    }
2224
2225    pub fn send_bytes_external<L2>(self, other: &ExternalProcess<L2>) -> ExternalBytesPort
2226    where
2227        L2: 'a,
2228        L::Root: CanSend<'a, ExternalProcess<'a, L2>, In<Bytes> = T, Out<Bytes> = Bytes>,
2229    {
2230        let metadata = other.new_node_metadata::<Bytes>();
2231
2232        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
2233        let external_key = flow_state_borrow.next_external_out;
2234        flow_state_borrow.next_external_out += 1;
2235
2236        let leaves = flow_state_borrow.leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled()");
2237
2238        let dummy_f: syn::Expr = syn::parse_quote!(());
2239
2240        leaves.push(HydroLeaf::ForEach {
2241            f: dummy_f.into(),
2242            input: Box::new(HydroNode::Network {
2243                from_key: None,
2244                to_location: other.id(),
2245                to_key: Some(external_key),
2246                serialize_fn: None,
2247                instantiate_fn: DebugInstantiate::Building,
2248                deserialize_fn: None,
2249                input: Box::new(self.ir_node.into_inner()),
2250                metadata: metadata.clone(),
2251            }),
2252            metadata,
2253        });
2254
2255        ExternalBytesPort {
2256            process_id: other.id,
2257            port_id: external_key,
2258        }
2259    }
2260
2261    pub fn send_bincode_anonymous<L2, Tag, CoreType>(
2262        self,
2263        other: &L2,
2264    ) -> Stream<CoreType, L2, Unbounded, O::Min, R>
2265    where
2266        L2: Location<'a>,
2267        L::Root: CanSend<'a, L2, In<CoreType> = T, Out<CoreType> = (Tag, CoreType)>,
2268        CoreType: Serialize + DeserializeOwned,
2269        O: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<O>>,
2270    {
2271        self.send_bincode::<L2, CoreType>(other).map(q!(|(_, b)| b))
2272    }
2273
2274    pub fn send_bytes_anonymous<L2, Tag>(
2275        self,
2276        other: &L2,
2277    ) -> Stream<Bytes, L2, Unbounded, O::Min, R>
2278    where
2279        L2: Location<'a>,
2280        L::Root: CanSend<'a, L2, In<Bytes> = T, Out<Bytes> = (Tag, Bytes)>,
2281        O: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<O>>,
2282    {
2283        self.send_bytes::<L2>(other).map(q!(|(_, b)| b))
2284    }
2285
2286    #[expect(clippy::type_complexity, reason = "ordering semantics for broadcast")]
2287    pub fn broadcast_bincode<C2>(
2288        self,
2289        other: &Cluster<'a, C2>,
2290    ) -> Stream<
2291        <L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<T>,
2292        Cluster<'a, C2>,
2293        Unbounded,
2294        O::Min,
2295        R,
2296    >
2297    where
2298        C2: 'a,
2299        L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T)>,
2300        T: Clone + Serialize + DeserializeOwned,
2301        O: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<O>>,
2302    {
2303        let ids = other.members();
2304
2305        let to_send: Stream<(u32, Bytes), L, B, O, R> = self
2306            .map::<Bytes, _>(q!(|v| bincode::serialize(&v).unwrap().into()))
2307            .flat_map_ordered(q!(|v| { ids.iter().map(move |id| (id.raw_id, v.clone())) }));
2308
2309        let deserialize_pipeline = Some(deserialize_bincode::<T>(L::Root::tagged_type().as_ref()));
2310
2311        Stream::new(
2312            other.clone(),
2313            HydroNode::Network {
2314                from_key: None,
2315                to_location: other.id(),
2316                to_key: None,
2317                serialize_fn: None,
2318                instantiate_fn: DebugInstantiate::Building,
2319                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
2320                input: Box::new(to_send.ir_node.into_inner()),
2321                metadata: other.new_node_metadata::<T>(),
2322            },
2323        )
2324    }
2325
2326    pub fn broadcast_bincode_anonymous<C2, Tag>(
2327        self,
2328        other: &Cluster<'a, C2>,
2329    ) -> Stream<T, Cluster<'a, C2>, Unbounded, O::Min, R>
2330    where
2331        C2: 'a,
2332        L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)>,
2333        T: Clone + Serialize + DeserializeOwned,
2334        O: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<O>>,
2335    {
2336        self.broadcast_bincode(other).map(q!(|(_, b)| b))
2337    }
2338
2339    #[expect(clippy::type_complexity, reason = "ordering semantics for broadcast")]
2340    pub fn broadcast_bytes<C2>(
2341        self,
2342        other: &Cluster<'a, C2>,
2343    ) -> Stream<
2344        <L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<Bytes>,
2345        Cluster<'a, C2>,
2346        Unbounded,
2347        O::Min,
2348        R,
2349    >
2350    where
2351        C2: 'a,
2352        L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T)>,
2353        T: Clone,
2354        O: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<O>>,
2355    {
2356        let ids = other.members();
2357
2358        self.flat_map_ordered(q!(|b| ids.iter().map(move |id| (
2359            ::std::clone::Clone::clone(id),
2360            ::std::clone::Clone::clone(&b)
2361        ))))
2362        .send_bytes(other)
2363    }
2364
2365    pub fn broadcast_bytes_anonymous<C2, Tag>(
2366        self,
2367        other: &Cluster<'a, C2>,
2368    ) -> Stream<Bytes, Cluster<'a, C2>, Unbounded, O::Min, R>
2369    where
2370        C2: 'a,
2371        L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T), Out<Bytes> = (Tag, Bytes)>
2372            + 'a,
2373        T: Clone,
2374        O: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<O>>,
2375    {
2376        self.broadcast_bytes(other).map(q!(|(_, b)| b))
2377    }
2378}
2379
2380#[expect(clippy::type_complexity, reason = "ordering semantics for round-robin")]
2381impl<'a, T, L, B> Stream<T, L, B, TotalOrder, ExactlyOnce>
2382where
2383    L: Location<'a> + NoTick,
2384{
2385    pub fn round_robin_bincode<C2>(
2386        self,
2387        other: &Cluster<'a, C2>,
2388    ) -> Stream<
2389        <L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<T>,
2390        Cluster<'a, C2>,
2391        Unbounded,
2392        <TotalOrder as MinOrder<
2393            <L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>,
2394        >>::Min,
2395        ExactlyOnce,
2396    >
2397    where
2398        C2: 'a,
2399        L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T)>,
2400        T: Clone + Serialize + DeserializeOwned,
2401        TotalOrder:
2402            MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
2403    {
2404        let ids = other.members();
2405
2406        self.enumerate()
2407            .map(q!(|(i, w)| (ids[i % ids.len()], w)))
2408            .send_bincode(other)
2409    }
2410
2411    pub fn round_robin_bincode_anonymous<C2, Tag>(
2412        self,
2413        other: &Cluster<'a, C2>,
2414    ) -> Stream<
2415        T,
2416        Cluster<'a, C2>,
2417        Unbounded,
2418        <TotalOrder as MinOrder<
2419            <L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>,
2420        >>::Min,
2421        ExactlyOnce,
2422    >
2423    where
2424        C2: 'a,
2425        L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)> + 'a,
2426        T: Clone + Serialize + DeserializeOwned,
2427        TotalOrder:
2428            MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
2429    {
2430        self.round_robin_bincode(other).map(q!(|(_, b)| b))
2431    }
2432
2433    pub fn round_robin_bytes<C2>(
2434        self,
2435        other: &Cluster<'a, C2>,
2436    ) -> Stream<
2437        <L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<Bytes>,
2438        Cluster<'a, C2>,
2439        Unbounded,
2440        <TotalOrder as MinOrder<
2441            <L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>,
2442        >>::Min,
2443        ExactlyOnce,
2444    >
2445    where
2446        C2: 'a,
2447        L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T)> + 'a,
2448        T: Clone,
2449        TotalOrder:
2450            MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
2451    {
2452        let ids = other.members();
2453
2454        self.enumerate()
2455            .map(q!(|(i, w)| (ids[i % ids.len()], w)))
2456            .send_bytes(other)
2457    }
2458
2459    pub fn round_robin_bytes_anonymous<C2, Tag>(
2460        self,
2461        other: &Cluster<'a, C2>,
2462    ) -> Stream<
2463        Bytes,
2464        Cluster<'a, C2>,
2465        Unbounded,
2466        <TotalOrder as MinOrder<
2467            <L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>,
2468        >>::Min,
2469        ExactlyOnce,
2470    >
2471    where
2472        C2: 'a,
2473        L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T), Out<Bytes> = (Tag, Bytes)>
2474            + 'a,
2475        T: Clone,
2476        TotalOrder:
2477            MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
2478    {
2479        self.round_robin_bytes(other).map(q!(|(_, b)| b))
2480    }
2481}
2482
2483#[cfg(test)]
2484mod tests {
2485    use futures::StreamExt;
2486    use hydro_deploy::Deployment;
2487    use serde::{Deserialize, Serialize};
2488    use stageleft::q;
2489
2490    use crate::FlowBuilder;
2491    use crate::location::Location;
2492
2493    struct P1 {}
2494    struct P2 {}
2495
2496    #[derive(Serialize, Deserialize, Debug)]
2497    struct SendOverNetwork {
2498        n: u32,
2499    }
2500
2501    #[tokio::test]
2502    async fn first_ten_distributed() {
2503        let mut deployment = Deployment::new();
2504
2505        let flow = FlowBuilder::new();
2506        let first_node = flow.process::<P1>();
2507        let second_node = flow.process::<P2>();
2508        let external = flow.external_process::<P2>();
2509
2510        let numbers = first_node.source_iter(q!(0..10));
2511        let out_port = numbers
2512            .map(q!(|n| SendOverNetwork { n }))
2513            .send_bincode(&second_node)
2514            .send_bincode_external(&external);
2515
2516        let nodes = flow
2517            .with_process(&first_node, deployment.Localhost())
2518            .with_process(&second_node, deployment.Localhost())
2519            .with_external(&external, deployment.Localhost())
2520            .deploy(&mut deployment);
2521
2522        deployment.deploy().await.unwrap();
2523
2524        let mut external_out = nodes.connect_source_bincode(out_port).await;
2525
2526        deployment.start().await.unwrap();
2527
2528        for i in 0..10 {
2529            assert_eq!(external_out.next().await.unwrap().n, i);
2530        }
2531    }
2532}