hydro_lang/
stream.rs

1use std::cell::RefCell;
2use std::hash::Hash;
3use std::marker::PhantomData;
4use std::ops::Deref;
5use std::rc::Rc;
6
7use dfir_rs::bytes::Bytes;
8use dfir_rs::futures;
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use stageleft::{IntoQuotedMut, QuotedWithContext, q};
12use syn::parse_quote;
13use tokio::time::Instant;
14
15use crate::builder::FLOW_USED_MESSAGE;
16use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker};
17use crate::ir::{DebugInstantiate, HydroLeaf, HydroNode, TeeNode};
18use crate::location::external_process::{ExternalBincodeStream, ExternalBytesPort};
19use crate::location::tick::{Atomic, NoAtomic};
20use crate::location::{
21    CanSend, ExternalProcess, Location, LocationId, NoTick, Tick, check_matching_location,
22};
23use crate::staging_util::get_this_crate;
24use crate::{Bounded, Cluster, ClusterId, Optional, Singleton, Unbounded};
25
26/// Marks the stream as being totally ordered, which means that there are
27/// no sources of non-determinism (other than intentional ones) that will
28/// affect the order of elements.
29pub struct TotalOrder {}
30
31/// Marks the stream as having no order, which means that the order of
32/// elements may be affected by non-determinism.
33///
34/// This restricts certain operators, such as `fold` and `reduce`, to only
35/// be used with commutative aggregation functions.
36pub struct NoOrder {}
37
38/// Helper trait for determining the weakest of two orderings.
39#[sealed::sealed]
40pub trait MinOrder<Other> {
41    /// The weaker of the two orderings.
42    type Min;
43}
44
45#[sealed::sealed]
46impl<T> MinOrder<T> for T {
47    type Min = T;
48}
49
50#[sealed::sealed]
51impl MinOrder<NoOrder> for TotalOrder {
52    type Min = NoOrder;
53}
54
55#[sealed::sealed]
56impl MinOrder<TotalOrder> for NoOrder {
57    type Min = NoOrder;
58}
59
60/// An ordered sequence stream of elements of type `T`.
61///
62/// Type Parameters:
63/// - `T`: the type of elements in the stream
64/// - `L`: the location where the stream is being materialized
65/// - `B`: the boundedness of the stream, which is either [`Bounded`]
66///   or [`Unbounded`]
67/// - `Order`: the ordering of the stream, which is either [`TotalOrder`]
68///   or [`NoOrder`] (default is [`TotalOrder`])
69pub struct Stream<T, L, B, Order = TotalOrder> {
70    location: L,
71    pub(crate) ir_node: RefCell<HydroNode>,
72
73    _phantom: PhantomData<(T, L, B, Order)>,
74}
75
76impl<'a, T, L: Location<'a>, O> From<Stream<T, L, Bounded, O>> for Stream<T, L, Unbounded, O> {
77    fn from(stream: Stream<T, L, Bounded, O>) -> Stream<T, L, Unbounded, O> {
78        Stream {
79            location: stream.location,
80            ir_node: stream.ir_node,
81            _phantom: PhantomData,
82        }
83    }
84}
85
86impl<'a, T, L: Location<'a>, B> From<Stream<T, L, B, TotalOrder>> for Stream<T, L, B, NoOrder> {
87    fn from(stream: Stream<T, L, B, TotalOrder>) -> Stream<T, L, B, NoOrder> {
88        Stream {
89            location: stream.location,
90            ir_node: stream.ir_node,
91            _phantom: PhantomData,
92        }
93    }
94}
95
96impl<'a, T, L: Location<'a>, B, Order> Stream<T, L, B, Order> {
97    fn location_kind(&self) -> LocationId {
98        self.location.id()
99    }
100}
101
102impl<'a, T, L: Location<'a>, Order> DeferTick for Stream<T, Tick<L>, Bounded, Order> {
103    fn defer_tick(self) -> Self {
104        Stream::defer_tick(self)
105    }
106}
107
108impl<'a, T, L: Location<'a>, Order> CycleCollection<'a, TickCycleMarker>
109    for Stream<T, Tick<L>, Bounded, Order>
110{
111    type Location = Tick<L>;
112
113    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
114        let location_id = location.id();
115        Stream::new(
116            location.clone(),
117            HydroNode::CycleSource {
118                ident,
119                location_kind: location_id,
120                metadata: location.new_node_metadata::<T>(),
121            },
122        )
123    }
124}
125
126impl<'a, T, L: Location<'a>, Order> CycleComplete<'a, TickCycleMarker>
127    for Stream<T, Tick<L>, Bounded, Order>
128{
129    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
130        assert_eq!(
131            self.location.id(),
132            expected_location,
133            "locations do not match"
134        );
135        self.location
136            .flow_state()
137            .borrow_mut()
138            .leaves
139            .as_mut()
140            .expect(FLOW_USED_MESSAGE)
141            .push(HydroLeaf::CycleSink {
142                ident,
143                location_kind: self.location_kind(),
144                input: Box::new(self.ir_node.into_inner()),
145                metadata: self.location.new_node_metadata::<T>(),
146            });
147    }
148}
149
150impl<'a, T, L: Location<'a> + NoTick, B, Order> CycleCollection<'a, ForwardRefMarker>
151    for Stream<T, L, B, Order>
152{
153    type Location = L;
154
155    fn create_source(ident: syn::Ident, location: L) -> Self {
156        let location_id = location.id();
157
158        Stream::new(
159            location.clone(),
160            HydroNode::Persist {
161                inner: Box::new(HydroNode::CycleSource {
162                    ident,
163                    location_kind: location_id,
164                    metadata: location.new_node_metadata::<T>(),
165                }),
166                metadata: location.new_node_metadata::<T>(),
167            },
168        )
169    }
170}
171
172impl<'a, T, L: Location<'a> + NoTick, B, Order> CycleComplete<'a, ForwardRefMarker>
173    for Stream<T, L, B, Order>
174{
175    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
176        assert_eq!(
177            self.location.id(),
178            expected_location,
179            "locations do not match"
180        );
181        let metadata = self.location.new_node_metadata::<T>();
182        self.location
183            .flow_state()
184            .borrow_mut()
185            .leaves
186            .as_mut()
187            .expect(FLOW_USED_MESSAGE)
188            .push(HydroLeaf::CycleSink {
189                ident,
190                location_kind: self.location_kind(),
191                input: Box::new(HydroNode::Unpersist {
192                    inner: Box::new(self.ir_node.into_inner()),
193                    metadata: metadata.clone(),
194                }),
195                metadata,
196            });
197    }
198}
199
200impl<'a, T, L: Location<'a>, B, Order> Stream<T, L, B, Order> {
201    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
202        Stream {
203            location,
204            ir_node: RefCell::new(ir_node),
205            _phantom: PhantomData,
206        }
207    }
208}
209
210impl<'a, T: Clone, L: Location<'a>, B, Order> Clone for Stream<T, L, B, Order> {
211    fn clone(&self) -> Self {
212        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
213            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
214            *self.ir_node.borrow_mut() = HydroNode::Tee {
215                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
216                metadata: self.location.new_node_metadata::<T>(),
217            };
218        }
219
220        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
221            Stream {
222                location: self.location.clone(),
223                ir_node: HydroNode::Tee {
224                    inner: TeeNode(inner.0.clone()),
225                    metadata: metadata.clone(),
226                }
227                .into(),
228                _phantom: PhantomData,
229            }
230        } else {
231            unreachable!()
232        }
233    }
234}
235
236impl<'a, T, L: Location<'a>, B, Order> Stream<T, L, B, Order> {
237    /// Produces a stream based on invoking `f` on each element in order.
238    /// If you do not want to modify the stream and instead only want to view
239    /// each item use [`Stream::inspect`] instead.
240    ///
241    /// # Example
242    /// ```rust
243    /// # use hydro_lang::*;
244    /// # use dfir_rs::futures::StreamExt;
245    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
246    /// let words = process.source_iter(q!(vec!["hello", "world"]));
247    /// words.map(q!(|x| x.to_uppercase()))
248    /// # }, |mut stream| async move {
249    /// # for w in vec!["HELLO", "WORLD"] {
250    /// #     assert_eq!(stream.next().await.unwrap(), w);
251    /// # }
252    /// # }));
253    /// ```
254    pub fn map<U, F: Fn(T) -> U + 'a>(
255        self,
256        f: impl IntoQuotedMut<'a, F, L>,
257    ) -> Stream<U, L, B, Order> {
258        let f = f.splice_fn1_ctx(&self.location).into();
259        Stream::new(
260            self.location.clone(),
261            HydroNode::Map {
262                f,
263                input: Box::new(self.ir_node.into_inner()),
264                metadata: self.location.new_node_metadata::<U>(),
265            },
266        )
267    }
268
269    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
270    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
271    /// for the output type `U` must produce items in a **deterministic** order.
272    ///
273    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
274    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
275    ///
276    /// # Example
277    /// ```rust
278    /// # use hydro_lang::*;
279    /// # use dfir_rs::futures::StreamExt;
280    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
281    /// process
282    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
283    ///     .flat_map_ordered(q!(|x| x))
284    /// # }, |mut stream| async move {
285    /// // 1, 2, 3, 4
286    /// # for w in (1..5) {
287    /// #     assert_eq!(stream.next().await.unwrap(), w);
288    /// # }
289    /// # }));
290    /// ```
291    pub fn flat_map_ordered<U, I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a>(
292        self,
293        f: impl IntoQuotedMut<'a, F, L>,
294    ) -> Stream<U, L, B, Order> {
295        let f = f.splice_fn1_ctx(&self.location).into();
296        Stream::new(
297            self.location.clone(),
298            HydroNode::FlatMap {
299                f,
300                input: Box::new(self.ir_node.into_inner()),
301                metadata: self.location.new_node_metadata::<U>(),
302            },
303        )
304    }
305
306    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
307    /// for the output type `U` to produce items in any order.
308    ///
309    /// # Example
310    /// ```rust
311    /// # use hydro_lang::*;
312    /// # use dfir_rs::futures::StreamExt;
313    /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, NoOrder>(|process| {
314    /// process
315    ///     .source_iter(q!(vec![
316    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
317    ///         std::collections::HashSet::from_iter(vec![3, 4]),
318    ///     ]))
319    ///     .flat_map_unordered(q!(|x| x))
320    /// # }, |mut stream| async move {
321    /// // 1, 2, 3, 4, but in no particular order
322    /// # let mut results = Vec::new();
323    /// # for w in (1..5) {
324    /// #     results.push(stream.next().await.unwrap());
325    /// # }
326    /// # results.sort();
327    /// # assert_eq!(results, vec![1, 2, 3, 4]);
328    /// # }));
329    /// ```
330    pub fn flat_map_unordered<U, I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a>(
331        self,
332        f: impl IntoQuotedMut<'a, F, L>,
333    ) -> Stream<U, L, B, NoOrder> {
334        let f = f.splice_fn1_ctx(&self.location).into();
335        Stream::new(
336            self.location.clone(),
337            HydroNode::FlatMap {
338                f,
339                input: Box::new(self.ir_node.into_inner()),
340                metadata: self.location.new_node_metadata::<U>(),
341            },
342        )
343    }
344
345    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
346    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
347    ///
348    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
349    /// not deterministic, use [`Stream::flatten_unordered`] instead.
350    ///
351    /// ```rust
352    /// # use hydro_lang::*;
353    /// # use dfir_rs::futures::StreamExt;
354    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
355    /// process
356    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
357    ///     .flatten_ordered()
358    /// # }, |mut stream| async move {
359    /// // 1, 2, 3, 4
360    /// # for w in (1..5) {
361    /// #     assert_eq!(stream.next().await.unwrap(), w);
362    /// # }
363    /// # }));
364    /// ```
365    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, Order>
366    where
367        T: IntoIterator<Item = U>,
368    {
369        self.flat_map_ordered(q!(|d| d))
370    }
371
372    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
373    /// for the element type `T` to produce items in any order.
374    ///
375    /// # Example
376    /// ```rust
377    /// # use hydro_lang::*;
378    /// # use dfir_rs::futures::StreamExt;
379    /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, NoOrder>(|process| {
380    /// process
381    ///     .source_iter(q!(vec![
382    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
383    ///         std::collections::HashSet::from_iter(vec![3, 4]),
384    ///     ]))
385    ///     .flatten_unordered()
386    /// # }, |mut stream| async move {
387    /// // 1, 2, 3, 4, but in no particular order
388    /// # let mut results = Vec::new();
389    /// # for w in (1..5) {
390    /// #     results.push(stream.next().await.unwrap());
391    /// # }
392    /// # results.sort();
393    /// # assert_eq!(results, vec![1, 2, 3, 4]);
394    /// # }));
395    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder>
396    where
397        T: IntoIterator<Item = U>,
398    {
399        self.flat_map_unordered(q!(|d| d))
400    }
401
402    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
403    /// `f`, preserving the order of the elements.
404    ///
405    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
406    /// not modify or take ownership of the values. If you need to modify the values while filtering
407    /// use [`Stream::filter_map`] instead.
408    ///
409    /// # Example
410    /// ```rust
411    /// # use hydro_lang::*;
412    /// # use dfir_rs::futures::StreamExt;
413    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
414    /// process
415    ///     .source_iter(q!(vec![1, 2, 3, 4]))
416    ///     .filter(q!(|&x| x > 2))
417    /// # }, |mut stream| async move {
418    /// // 3, 4
419    /// # for w in (3..5) {
420    /// #     assert_eq!(stream.next().await.unwrap(), w);
421    /// # }
422    /// # }));
423    /// ```
424    pub fn filter<F: Fn(&T) -> bool + 'a>(
425        self,
426        f: impl IntoQuotedMut<'a, F, L>,
427    ) -> Stream<T, L, B, Order> {
428        let f = f.splice_fn1_borrow_ctx(&self.location).into();
429        Stream::new(
430            self.location.clone(),
431            HydroNode::Filter {
432                f,
433                input: Box::new(self.ir_node.into_inner()),
434                metadata: self.location.new_node_metadata::<T>(),
435            },
436        )
437    }
438
439    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
440    ///
441    /// # Example
442    /// ```rust
443    /// # use hydro_lang::*;
444    /// # use dfir_rs::futures::StreamExt;
445    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
446    /// process
447    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
448    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
449    /// # }, |mut stream| async move {
450    /// // 1, 2
451    /// # for w in (1..3) {
452    /// #     assert_eq!(stream.next().await.unwrap(), w);
453    /// # }
454    /// # }));
455    pub fn filter_map<U, F: Fn(T) -> Option<U> + 'a>(
456        self,
457        f: impl IntoQuotedMut<'a, F, L>,
458    ) -> Stream<U, L, B, Order> {
459        let f = f.splice_fn1_ctx(&self.location).into();
460        Stream::new(
461            self.location.clone(),
462            HydroNode::FilterMap {
463                f,
464                input: Box::new(self.ir_node.into_inner()),
465                metadata: self.location.new_node_metadata::<U>(),
466            },
467        )
468    }
469
470    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
471    /// where `x` is the final value of `other`, a bounded [`Singleton`].
472    ///
473    /// # Example
474    /// ```rust
475    /// # use hydro_lang::*;
476    /// # use dfir_rs::futures::StreamExt;
477    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
478    /// let tick = process.tick();
479    /// let batch = unsafe {
480    ///     process
481    ///         .source_iter(q!(vec![1, 2, 3, 4]))
482    ///         .tick_batch(&tick)
483    /// };
484    /// let count = batch.clone().count(); // `count()` returns a singleton
485    /// batch.cross_singleton(count).all_ticks()
486    /// # }, |mut stream| async move {
487    /// // (1, 4), (2, 4), (3, 4), (4, 4)
488    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
489    /// #     assert_eq!(stream.next().await.unwrap(), w);
490    /// # }
491    /// # }));
492    pub fn cross_singleton<O>(
493        self,
494        other: impl Into<Optional<O, L, Bounded>>,
495    ) -> Stream<(T, O), L, B, Order>
496    where
497        O: Clone,
498    {
499        let other: Optional<O, L, Bounded> = other.into();
500        check_matching_location(&self.location, &other.location);
501
502        Stream::new(
503            self.location.clone(),
504            HydroNode::CrossSingleton {
505                left: Box::new(self.ir_node.into_inner()),
506                right: Box::new(other.ir_node.into_inner()),
507                metadata: self.location.new_node_metadata::<(T, O)>(),
508            },
509        )
510    }
511
512    /// Allow this stream through if the argument (a Bounded Optional) is non-empty, otherwise the output is empty.
513    pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, Order> {
514        self.cross_singleton(signal.map(q!(|_u| ())))
515            .map(q!(|(d, _signal)| d))
516    }
517
518    /// Allow this stream through if the argument (a Bounded Optional) is empty, otherwise the output is empty.
519    pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, Order> {
520        self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
521    }
522
523    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
524    /// tupled pairs in a non-deterministic order.
525    ///
526    /// # Example
527    /// ```rust
528    /// # use hydro_lang::*;
529    /// # use std::collections::HashSet;
530    /// # use dfir_rs::futures::StreamExt;
531    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
532    /// let tick = process.tick();
533    /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
534    /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
535    /// stream1.cross_product(stream2)
536    /// # }, |mut stream| async move {
537    /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
538    /// # stream.map(|i| assert!(expected.contains(&i)));
539    /// # }));
540    pub fn cross_product<O>(self, other: Stream<O, L, B, Order>) -> Stream<(T, O), L, B, NoOrder>
541    where
542        T: Clone,
543        O: Clone,
544    {
545        check_matching_location(&self.location, &other.location);
546
547        Stream::new(
548            self.location.clone(),
549            HydroNode::CrossProduct {
550                left: Box::new(self.ir_node.into_inner()),
551                right: Box::new(other.ir_node.into_inner()),
552                metadata: self.location.new_node_metadata::<(T, O)>(),
553            },
554        )
555    }
556
557    /// Takes one stream as input and filters out any duplicate occurrences. The output
558    /// contains all unique values from the input.
559    ///
560    /// # Example
561    /// ```rust
562    /// # use hydro_lang::*;
563    /// # use dfir_rs::futures::StreamExt;
564    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
565    /// let tick = process.tick();
566    ///     process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
567    /// # }, |mut stream| async move {
568    /// # for w in vec![1, 2, 3, 4] {
569    /// #     assert_eq!(stream.next().await.unwrap(), w);
570    /// # }
571    /// # }));
572    pub fn unique(self) -> Stream<T, L, B, Order>
573    where
574        T: Eq + Hash,
575    {
576        Stream::new(
577            self.location.clone(),
578            HydroNode::Unique {
579                input: Box::new(self.ir_node.into_inner()),
580                metadata: self.location.new_node_metadata::<T>(),
581            },
582        )
583    }
584
585    /// Outputs everything in this stream that is *not* contained in the `other` stream.
586    ///
587    /// The `other` stream must be [`Bounded`], since this function will wait until
588    /// all its elements are available before producing any output.
589    /// # Example
590    /// ```rust
591    /// # use hydro_lang::*;
592    /// # use dfir_rs::futures::StreamExt;
593    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
594    /// let tick = process.tick();
595    /// let stream = unsafe {
596    ///    process
597    ///    .source_iter(q!(vec![ 1, 2, 3, 4 ]))
598    ///    .tick_batch(&tick)
599    /// };
600    /// let batch = unsafe {
601    ///     process
602    ///         .source_iter(q!(vec![1, 2]))
603    ///         .tick_batch(&tick)
604    /// };
605    /// stream.filter_not_in(batch).all_ticks()
606    /// # }, |mut stream| async move {
607    /// # for w in vec![3, 4] {
608    /// #     assert_eq!(stream.next().await.unwrap(), w);
609    /// # }
610    /// # }));
611    pub fn filter_not_in<O2>(self, other: Stream<T, L, Bounded, O2>) -> Stream<T, L, Bounded, Order>
612    where
613        T: Eq + Hash,
614    {
615        check_matching_location(&self.location, &other.location);
616
617        Stream::new(
618            self.location.clone(),
619            HydroNode::Difference {
620                pos: Box::new(self.ir_node.into_inner()),
621                neg: Box::new(other.ir_node.into_inner()),
622                metadata: self.location.new_node_metadata::<T>(),
623            },
624        )
625    }
626
627    /// An operator which allows you to "inspect" each element of a stream without
628    /// modifying it. The closure `f` is called on a reference to each item. This is
629    /// mainly useful for debugging, and should not be used to generate side-effects.
630    ///
631    /// # Example
632    /// ```rust
633    /// # use hydro_lang::*;
634    /// # use dfir_rs::futures::StreamExt;
635    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
636    /// let nums = process.source_iter(q!(vec![1, 2]));
637    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
638    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
639    /// # }, |mut stream| async move {
640    /// # for w in vec![1, 2] {
641    /// #     assert_eq!(stream.next().await.unwrap(), w);
642    /// # }
643    /// # }));
644    /// ```
645    pub fn inspect<F: Fn(&T) + 'a>(
646        self,
647        f: impl IntoQuotedMut<'a, F, L>,
648    ) -> Stream<T, L, B, Order> {
649        let f = f.splice_fn1_borrow_ctx(&self.location).into();
650
651        if L::is_top_level() {
652            Stream::new(
653                self.location.clone(),
654                HydroNode::Persist {
655                    inner: Box::new(HydroNode::Inspect {
656                        f,
657                        input: Box::new(HydroNode::Unpersist {
658                            inner: Box::new(self.ir_node.into_inner()),
659                            metadata: self.location.new_node_metadata::<T>(),
660                        }),
661                        metadata: self.location.new_node_metadata::<T>(),
662                    }),
663                    metadata: self.location.new_node_metadata::<T>(),
664                },
665            )
666        } else {
667            Stream::new(
668                self.location.clone(),
669                HydroNode::Inspect {
670                    f,
671                    input: Box::new(self.ir_node.into_inner()),
672                    metadata: self.location.new_node_metadata::<T>(),
673                },
674            )
675        }
676    }
677
678    /// Explicitly "casts" the stream to a type with a different ordering
679    /// guarantee. Useful in unsafe code where the ordering cannot be proven
680    /// by the type-system.
681    ///
682    /// # Safety
683    /// This function is used as an escape hatch, and any mistakes in the
684    /// provided ordering guarantee will propagate into the guarantees
685    /// for the rest of the program.
686    ///
687    /// # Example
688    /// # TODO: more sensible code after Shadaj merges
689    /// ```rust
690    /// # use hydro_lang::*;
691    /// # use std::collections::HashSet;
692    /// # use dfir_rs::futures::StreamExt;
693    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
694    /// let nums = process.source_iter(q!({
695    ///     let now = std::time::SystemTime::now();
696    ///     match now.elapsed().unwrap().as_secs() % 2 {
697    ///         0 => vec![5, 4, 3, 2, 1],
698    ///         _ => vec![1, 2, 3, 4, 5],
699    ///     }
700    ///     .into_iter()
701    /// }));
702    /// // despite being generated by `source_iter`, the order of `nums` across runs is non-deterministic
703    /// let stream = unsafe { nums.assume_ordering::<NoOrder>() };
704    /// stream
705    /// # }, |mut stream| async move {
706    /// # for w in vec![1, 2, 3, 4, 5] {
707    /// #     assert!((1..=5).contains(&stream.next().await.unwrap()));
708    /// # }
709    /// # }));
710    /// ```
711    pub unsafe fn assume_ordering<O>(self) -> Stream<T, L, B, O> {
712        Stream::new(self.location, self.ir_node.into_inner())
713    }
714}
715
716impl<'a, T, L: Location<'a>, B, Order> Stream<&T, L, B, Order> {
717    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
718    ///
719    /// # Example
720    /// ```rust
721    /// # use hydro_lang::*;
722    /// # use dfir_rs::futures::StreamExt;
723    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
724    /// process.source_iter(q!(&[1, 2, 3])).cloned()
725    /// # }, |mut stream| async move {
726    /// // 1, 2, 3
727    /// # for w in vec![1, 2, 3] {
728    /// #     assert_eq!(stream.next().await.unwrap(), w);
729    /// # }
730    /// # }));
731    /// ```
732    pub fn cloned(self) -> Stream<T, L, B, Order>
733    where
734        T: Clone,
735    {
736        self.map(q!(|d| d.clone()))
737    }
738}
739
740impl<'a, T, L: Location<'a>, B, Order> Stream<T, L, B, Order>
741where
742    Order: MinOrder<NoOrder, Min = NoOrder>,
743{
744    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
745    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
746    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
747    ///
748    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
749    ///
750    /// # Example
751    /// ```rust
752    /// # use hydro_lang::*;
753    /// # use dfir_rs::futures::StreamExt;
754    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
755    /// let tick = process.tick();
756    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
757    /// let batch = unsafe { numbers.tick_batch(&tick) };
758    /// batch
759    ///     .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
760    ///     .all_ticks()
761    /// # }, |mut stream| async move {
762    /// // 10
763    /// # assert_eq!(stream.next().await.unwrap(), 10);
764    /// # }));
765    /// ```
766    pub fn fold_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
767        self,
768        init: impl IntoQuotedMut<'a, I, L>,
769        comb: impl IntoQuotedMut<'a, F, L>,
770    ) -> Singleton<A, L, B> {
771        let init = init.splice_fn0_ctx(&self.location).into();
772        let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
773
774        let mut core = HydroNode::Fold {
775            init,
776            acc: comb,
777            input: Box::new(self.ir_node.into_inner()),
778            metadata: self.location.new_node_metadata::<A>(),
779        };
780
781        if L::is_top_level() {
782            // top-level (possibly unbounded) singletons are represented as
783            // a stream which produces all values from all ticks every tick,
784            // so Unpersist will always give the lastest aggregation
785            core = HydroNode::Persist {
786                inner: Box::new(core),
787                metadata: self.location.new_node_metadata::<A>(),
788            };
789        }
790
791        Singleton::new(self.location, core)
792    }
793
794    /// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream,
795    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
796    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
797    /// reference, so that it can be modified in place.
798    ///
799    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
800    ///
801    /// # Example
802    /// ```rust
803    /// # use hydro_lang::*;
804    /// # use dfir_rs::futures::StreamExt;
805    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
806    /// let tick = process.tick();
807    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
808    /// let batch = unsafe { numbers.tick_batch(&tick) };
809    /// batch
810    ///     .reduce_commutative(q!(|curr, new| *curr += new))
811    ///     .all_ticks()
812    /// # }, |mut stream| async move {
813    /// // 10
814    /// # assert_eq!(stream.next().await.unwrap(), 10);
815    /// # }));
816    /// ```
817    pub fn reduce_commutative<F: Fn(&mut T, T) + 'a>(
818        self,
819        comb: impl IntoQuotedMut<'a, F, L>,
820    ) -> Optional<T, L, B> {
821        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
822        let mut core = HydroNode::Reduce {
823            f,
824            input: Box::new(self.ir_node.into_inner()),
825            metadata: self.location.new_node_metadata::<T>(),
826        };
827
828        if L::is_top_level() {
829            core = HydroNode::Persist {
830                inner: Box::new(core),
831                metadata: self.location.new_node_metadata::<T>(),
832            };
833        }
834
835        Optional::new(self.location, core)
836    }
837
838    /// Computes the maximum element in the stream as an [`Optional`], which
839    /// will be empty until the first element in the input arrives.
840    ///
841    /// # Example
842    /// ```rust
843    /// # use hydro_lang::*;
844    /// # use dfir_rs::futures::StreamExt;
845    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
846    /// let tick = process.tick();
847    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
848    /// let batch = unsafe { numbers.tick_batch(&tick) };
849    /// batch.max().all_ticks()
850    /// # }, |mut stream| async move {
851    /// // 4
852    /// # assert_eq!(stream.next().await.unwrap(), 4);
853    /// # }));
854    /// ```
855    pub fn max(self) -> Optional<T, L, B>
856    where
857        T: Ord,
858    {
859        self.reduce_commutative(q!(|curr, new| {
860            if new > *curr {
861                *curr = new;
862            }
863        }))
864    }
865
866    /// Computes the maximum element in the stream as an [`Optional`], where the
867    /// maximum is determined according to the `key` function. The [`Optional`] will
868    /// be empty until the first element in the input arrives.
869    ///
870    /// # Example
871    /// ```rust
872    /// # use hydro_lang::*;
873    /// # use dfir_rs::futures::StreamExt;
874    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
875    /// let tick = process.tick();
876    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
877    /// let batch = unsafe { numbers.tick_batch(&tick) };
878    /// batch.max_by_key(q!(|x| -x)).all_ticks()
879    /// # }, |mut stream| async move {
880    /// // 1
881    /// # assert_eq!(stream.next().await.unwrap(), 1);
882    /// # }));
883    /// ```
884    pub fn max_by_key<K: Ord, F: Fn(&T) -> K + 'a>(
885        self,
886        key: impl IntoQuotedMut<'a, F, L> + Copy,
887    ) -> Optional<T, L, B> {
888        let f = key.splice_fn1_borrow_ctx(&self.location);
889
890        let wrapped: syn::Expr = parse_quote!({
891            let key_fn = #f;
892            move |curr, new| {
893                if key_fn(&new) > key_fn(&*curr) {
894                    *curr = new;
895                }
896            }
897        });
898
899        let mut core = HydroNode::Reduce {
900            f: wrapped.into(),
901            input: Box::new(self.ir_node.into_inner()),
902            metadata: self.location.new_node_metadata::<T>(),
903        };
904
905        if L::is_top_level() {
906            core = HydroNode::Persist {
907                inner: Box::new(core),
908                metadata: self.location.new_node_metadata::<T>(),
909            };
910        }
911
912        Optional::new(self.location, core)
913    }
914
915    /// Computes the minimum element in the stream as an [`Optional`], which
916    /// will be empty until the first element in the input arrives.
917    ///
918    /// # Example
919    /// ```rust
920    /// # use hydro_lang::*;
921    /// # use dfir_rs::futures::StreamExt;
922    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
923    /// let tick = process.tick();
924    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
925    /// let batch = unsafe { numbers.tick_batch(&tick) };
926    /// batch.min().all_ticks()
927    /// # }, |mut stream| async move {
928    /// // 1
929    /// # assert_eq!(stream.next().await.unwrap(), 1);
930    /// # }));
931    /// ```
932    pub fn min(self) -> Optional<T, L, B>
933    where
934        T: Ord,
935    {
936        self.reduce_commutative(q!(|curr, new| {
937            if new < *curr {
938                *curr = new;
939            }
940        }))
941    }
942
943    /// Computes the number of elements in the stream as a [`Singleton`].
944    ///
945    /// # Example
946    /// ```rust
947    /// # use hydro_lang::*;
948    /// # use dfir_rs::futures::StreamExt;
949    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
950    /// let tick = process.tick();
951    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
952    /// let batch = unsafe { numbers.tick_batch(&tick) };
953    /// batch.count().all_ticks()
954    /// # }, |mut stream| async move {
955    /// // 4
956    /// # assert_eq!(stream.next().await.unwrap(), 4);
957    /// # }));
958    /// ```
959    pub fn count(self) -> Singleton<usize, L, B> {
960        self.fold_commutative(q!(|| 0usize), q!(|count, _| *count += 1))
961    }
962}
963
964impl<'a, T, L: Location<'a>, B> Stream<T, L, B, TotalOrder> {
965    /// Returns a stream with the current count tupled with each element in the input stream.
966    ///
967    /// # Example
968    /// ```rust
969    /// # use hydro_lang::*;
970    /// # use dfir_rs::futures::StreamExt;
971    /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, TotalOrder>(|process| {
972    /// let tick = process.tick();
973    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
974    /// numbers.enumerate()
975    /// # }, |mut stream| async move {
976    /// // (0, 1), (1, 2), (2, 3), (3, 4)
977    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
978    /// #     assert_eq!(stream.next().await.unwrap(), w);
979    /// # }
980    /// # }));
981    /// ```
982    pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder> {
983        if L::is_top_level() {
984            Stream::new(
985                self.location.clone(),
986                HydroNode::Persist {
987                    inner: Box::new(HydroNode::Enumerate {
988                        is_static: true,
989                        input: Box::new(HydroNode::Unpersist {
990                            inner: Box::new(self.ir_node.into_inner()),
991                            metadata: self.location.new_node_metadata::<T>(),
992                        }),
993                        metadata: self.location.new_node_metadata::<(usize, T)>(),
994                    }),
995                    metadata: self.location.new_node_metadata::<(usize, T)>(),
996                },
997            )
998        } else {
999            Stream::new(
1000                self.location.clone(),
1001                HydroNode::Enumerate {
1002                    is_static: false,
1003                    input: Box::new(self.ir_node.into_inner()),
1004                    metadata: self.location.new_node_metadata::<(usize, T)>(),
1005                },
1006            )
1007        }
1008    }
1009
1010    /// Computes the first element in the stream as an [`Optional`], which
1011    /// will be empty until the first element in the input arrives.
1012    ///
1013    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1014    /// re-ordering of elements may cause the first element to change.
1015    ///
1016    /// # Example
1017    /// ```rust
1018    /// # use hydro_lang::*;
1019    /// # use dfir_rs::futures::StreamExt;
1020    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1021    /// let tick = process.tick();
1022    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1023    /// let batch = unsafe { numbers.tick_batch(&tick) };
1024    /// batch.first().all_ticks()
1025    /// # }, |mut stream| async move {
1026    /// // 1
1027    /// # assert_eq!(stream.next().await.unwrap(), 1);
1028    /// # }));
1029    /// ```
1030    pub fn first(self) -> Optional<T, L, B> {
1031        Optional::new(self.location, self.ir_node.into_inner())
1032    }
1033
1034    /// Computes the last element in the stream as an [`Optional`], which
1035    /// will be empty until an element in the input arrives.
1036    ///
1037    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1038    /// re-ordering of elements may cause the last element to change.
1039    ///
1040    /// # Example
1041    /// ```rust
1042    /// # use hydro_lang::*;
1043    /// # use dfir_rs::futures::StreamExt;
1044    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1045    /// let tick = process.tick();
1046    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1047    /// let batch = unsafe { numbers.tick_batch(&tick) };
1048    /// batch.last().all_ticks()
1049    /// # }, |mut stream| async move {
1050    /// // 4
1051    /// # assert_eq!(stream.next().await.unwrap(), 4);
1052    /// # }));
1053    /// ```
1054    pub fn last(self) -> Optional<T, L, B> {
1055        self.reduce(q!(|curr, new| *curr = new))
1056    }
1057
1058    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1059    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1060    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1061    ///
1062    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1063    /// to depend on the order of elements in the stream.
1064    ///
1065    /// # Example
1066    /// ```rust
1067    /// # use hydro_lang::*;
1068    /// # use dfir_rs::futures::StreamExt;
1069    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1070    /// let tick = process.tick();
1071    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1072    /// let batch = unsafe { words.tick_batch(&tick) };
1073    /// batch
1074    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1075    ///     .all_ticks()
1076    /// # }, |mut stream| async move {
1077    /// // "HELLOWORLD"
1078    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1079    /// # }));
1080    /// ```
1081    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
1082        self,
1083        init: impl IntoQuotedMut<'a, I, L>,
1084        comb: impl IntoQuotedMut<'a, F, L>,
1085    ) -> Singleton<A, L, B> {
1086        let init = init.splice_fn0_ctx(&self.location).into();
1087        let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1088
1089        let mut core = HydroNode::Fold {
1090            init,
1091            acc: comb,
1092            input: Box::new(self.ir_node.into_inner()),
1093            metadata: self.location.new_node_metadata::<A>(),
1094        };
1095
1096        if L::is_top_level() {
1097            // top-level (possibly unbounded) singletons are represented as
1098            // a stream which produces all values from all ticks every tick,
1099            // so Unpersist will always give the lastest aggregation
1100            core = HydroNode::Persist {
1101                inner: Box::new(core),
1102                metadata: self.location.new_node_metadata::<A>(),
1103            };
1104        }
1105
1106        Singleton::new(self.location, core)
1107    }
1108
1109    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1110    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1111    /// until the first element in the input arrives.
1112    ///
1113    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1114    /// to depend on the order of elements in the stream.
1115    ///
1116    /// # Example
1117    /// ```rust
1118    /// # use hydro_lang::*;
1119    /// # use dfir_rs::futures::StreamExt;
1120    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1121    /// let tick = process.tick();
1122    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1123    /// let batch = unsafe { words.tick_batch(&tick) };
1124    /// batch
1125    ///     .map(q!(|x| x.to_string()))
1126    ///     .reduce(q!(|curr, new| curr.push_str(&new)))
1127    ///     .all_ticks()
1128    /// # }, |mut stream| async move {
1129    /// // "HELLOWORLD"
1130    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1131    /// # }));
1132    /// ```
1133    pub fn reduce<F: Fn(&mut T, T) + 'a>(
1134        self,
1135        comb: impl IntoQuotedMut<'a, F, L>,
1136    ) -> Optional<T, L, B> {
1137        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1138        let mut core = HydroNode::Reduce {
1139            f,
1140            input: Box::new(self.ir_node.into_inner()),
1141            metadata: self.location.new_node_metadata::<T>(),
1142        };
1143
1144        if L::is_top_level() {
1145            core = HydroNode::Persist {
1146                inner: Box::new(core),
1147                metadata: self.location.new_node_metadata::<T>(),
1148            };
1149        }
1150
1151        Optional::new(self.location, core)
1152    }
1153}
1154
1155impl<'a, T, L: Location<'a> + NoTick + NoAtomic, O> Stream<T, L, Unbounded, O> {
1156    /// Produces a new stream that interleaves the elements of the two input streams.
1157    /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1158    ///
1159    /// Currently, both input streams must be [`Unbounded`]. When the streams are
1160    /// [`Bounded`], you can use [`Stream::chain`] instead.
1161    ///
1162    /// # Example
1163    /// ```rust
1164    /// # use hydro_lang::*;
1165    /// # use dfir_rs::futures::StreamExt;
1166    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1167    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1168    /// numbers.clone().map(q!(|x| x + 1)).union(numbers)
1169    /// # }, |mut stream| async move {
1170    /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1171    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1172    /// #     assert_eq!(stream.next().await.unwrap(), w);
1173    /// # }
1174    /// # }));
1175    /// ```
1176    pub fn union<O2>(self, other: Stream<T, L, Unbounded, O2>) -> Stream<T, L, Unbounded, NoOrder> {
1177        let tick = self.location.tick();
1178        unsafe {
1179            // SAFETY: Because the outputs are unordered,
1180            // we can interleave batches from both streams.
1181            self.tick_batch(&tick)
1182                .assume_ordering::<NoOrder>()
1183                .chain(other.tick_batch(&tick).assume_ordering::<NoOrder>())
1184                .all_ticks()
1185                .assume_ordering()
1186        }
1187    }
1188}
1189
1190impl<'a, T, L: Location<'a>, Order> Stream<T, L, Bounded, Order> {
1191    /// Produces a new stream that emits the input elements in sorted order.
1192    ///
1193    /// The input stream can have any ordering guarantee, but the output stream
1194    /// will have a [`TotalOrder`] guarantee. This operator will block until all
1195    /// elements in the input stream are available, so it requires the input stream
1196    /// to be [`Bounded`].
1197    ///
1198    /// # Example
1199    /// ```rust
1200    /// # use hydro_lang::*;
1201    /// # use dfir_rs::futures::StreamExt;
1202    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1203    /// let tick = process.tick();
1204    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1205    /// let batch = unsafe { numbers.tick_batch(&tick) };
1206    /// batch.sort().all_ticks()
1207    /// # }, |mut stream| async move {
1208    /// // 1, 2, 3, 4
1209    /// # for w in (1..5) {
1210    /// #     assert_eq!(stream.next().await.unwrap(), w);
1211    /// # }
1212    /// # }));
1213    /// ```
1214    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder>
1215    where
1216        T: Ord,
1217    {
1218        Stream::new(
1219            self.location.clone(),
1220            HydroNode::Sort {
1221                input: Box::new(self.ir_node.into_inner()),
1222                metadata: self.location.new_node_metadata::<T>(),
1223            },
1224        )
1225    }
1226
1227    /// Produces a new stream that first emits the elements of the `self` stream,
1228    /// and then emits the elements of the `other` stream. The output stream has
1229    /// a [`TotalOrder`] guarantee if and only if both input streams have a
1230    /// [`TotalOrder`] guarantee.
1231    ///
1232    /// Currently, both input streams must be [`Bounded`]. This operator will block
1233    /// on the first stream until all its elements are available. In a future version,
1234    /// we will relax the requirement on the `other` stream.
1235    ///
1236    /// # Example
1237    /// ```rust
1238    /// # use hydro_lang::*;
1239    /// # use dfir_rs::futures::StreamExt;
1240    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1241    /// let tick = process.tick();
1242    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1243    /// let batch = unsafe { numbers.tick_batch(&tick) };
1244    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1245    /// # }, |mut stream| async move {
1246    /// // 2, 3, 4, 5, 1, 2, 3, 4
1247    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1248    /// #     assert_eq!(stream.next().await.unwrap(), w);
1249    /// # }
1250    /// # }));
1251    /// ```
1252    pub fn chain<O2>(self, other: Stream<T, L, Bounded, O2>) -> Stream<T, L, Bounded, Order::Min>
1253    where
1254        Order: MinOrder<O2>,
1255    {
1256        check_matching_location(&self.location, &other.location);
1257
1258        Stream::new(
1259            self.location.clone(),
1260            HydroNode::Chain {
1261                first: Box::new(self.ir_node.into_inner()),
1262                second: Box::new(other.ir_node.into_inner()),
1263                metadata: self.location.new_node_metadata::<T>(),
1264            },
1265        )
1266    }
1267}
1268
1269impl<'a, K, V1, L: Location<'a>, B, Order> Stream<(K, V1), L, B, Order> {
1270    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
1271    /// by equi-joining the two streams on the key attribute `K`.
1272    ///
1273    /// # Example
1274    /// ```rust
1275    /// # use hydro_lang::*;
1276    /// # use std::collections::HashSet;
1277    /// # use dfir_rs::futures::StreamExt;
1278    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1279    /// let tick = process.tick();
1280    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
1281    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
1282    /// stream1.join(stream2)
1283    /// # }, |mut stream| async move {
1284    /// // (1, ('a', 'x')), (2, ('b', 'y'))
1285    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
1286    /// # stream.map(|i| assert!(expected.contains(&i)));
1287    /// # }));
1288    pub fn join<V2, O2>(self, n: Stream<(K, V2), L, B, O2>) -> Stream<(K, (V1, V2)), L, B, NoOrder>
1289    where
1290        K: Eq + Hash,
1291    {
1292        check_matching_location(&self.location, &n.location);
1293
1294        Stream::new(
1295            self.location.clone(),
1296            HydroNode::Join {
1297                left: Box::new(self.ir_node.into_inner()),
1298                right: Box::new(n.ir_node.into_inner()),
1299                metadata: self.location.new_node_metadata::<(K, (V1, V2))>(),
1300            },
1301        )
1302    }
1303
1304    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
1305    /// computes the anti-join of the items in the input -- i.e. returns
1306    /// unique items in the first input that do not have a matching key
1307    /// in the second input.
1308    ///
1309    /// # Example
1310    /// ```rust
1311    /// # use hydro_lang::*;
1312    /// # use dfir_rs::futures::StreamExt;
1313    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1314    /// let tick = process.tick();
1315    /// let stream = unsafe {
1316    ///    process
1317    ///    .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1318    ///    .tick_batch(&tick)
1319    /// };
1320    /// let batch = unsafe {
1321    ///     process
1322    ///         .source_iter(q!(vec![1, 2]))
1323    ///         .tick_batch(&tick)
1324    /// };
1325    /// stream.anti_join(batch).all_ticks()
1326    /// # }, |mut stream| async move {
1327    /// # for w in vec![(3, 'c'), (4, 'd')] {
1328    /// #     assert_eq!(stream.next().await.unwrap(), w);
1329    /// # }
1330    /// # }));
1331    pub fn anti_join<O2>(self, n: Stream<K, L, Bounded, O2>) -> Stream<(K, V1), L, B, Order>
1332    where
1333        K: Eq + Hash,
1334    {
1335        check_matching_location(&self.location, &n.location);
1336
1337        Stream::new(
1338            self.location.clone(),
1339            HydroNode::AntiJoin {
1340                pos: Box::new(self.ir_node.into_inner()),
1341                neg: Box::new(n.ir_node.into_inner()),
1342                metadata: self.location.new_node_metadata::<(K, V1)>(),
1343            },
1344        )
1345    }
1346}
1347
1348impl<'a, K: Eq + Hash, V, L: Location<'a>> Stream<(K, V), Tick<L>, Bounded> {
1349    /// A special case of [`Stream::fold`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1350    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1351    /// in the second element are accumulated via the `comb` closure.
1352    ///
1353    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1354    /// to depend on the order of elements in the stream.
1355    ///
1356    /// If the input and output value types are the same and do not require initialization then use
1357    /// [`Stream::reduce_keyed`].
1358    ///
1359    /// # Example
1360    /// ```rust
1361    /// # use hydro_lang::*;
1362    /// # use dfir_rs::futures::StreamExt;
1363    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1364    /// let tick = process.tick();
1365    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1366    /// let batch = unsafe { numbers.tick_batch(&tick) };
1367    /// batch
1368    ///     .fold_keyed(q!(|| 0), q!(|acc, x| *acc += x))
1369    ///     .all_ticks()
1370    /// # }, |mut stream| async move {
1371    /// // (1, 5), (2, 7)
1372    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1373    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1374    /// # }));
1375    /// ```
1376    pub fn fold_keyed<A, I: Fn() -> A + 'a, F: Fn(&mut A, V) + 'a>(
1377        self,
1378        init: impl IntoQuotedMut<'a, I, Tick<L>>,
1379        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1380    ) -> Stream<(K, A), Tick<L>, Bounded> {
1381        let init = init.splice_fn0_ctx(&self.location).into();
1382        let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1383
1384        Stream::new(
1385            self.location.clone(),
1386            HydroNode::FoldKeyed {
1387                init,
1388                acc: comb,
1389                input: Box::new(self.ir_node.into_inner()),
1390                metadata: self.location.new_node_metadata::<(K, A)>(),
1391            },
1392        )
1393    }
1394
1395    /// A special case of [`Stream::reduce`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1396    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1397    /// in the second element are accumulated via the `comb` closure.
1398    ///
1399    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1400    /// to depend on the order of elements in the stream.
1401    ///
1402    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed`].
1403    ///
1404    /// # Example
1405    /// ```rust
1406    /// # use hydro_lang::*;
1407    /// # use dfir_rs::futures::StreamExt;
1408    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1409    /// let tick = process.tick();
1410    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1411    /// let batch = unsafe { numbers.tick_batch(&tick) };
1412    /// batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks()
1413    /// # }, |mut stream| async move {
1414    /// // (1, 5), (2, 7)
1415    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1416    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1417    /// # }));
1418    /// ```
1419    pub fn reduce_keyed<F: Fn(&mut V, V) + 'a>(
1420        self,
1421        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1422    ) -> Stream<(K, V), Tick<L>, Bounded> {
1423        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1424
1425        Stream::new(
1426            self.location.clone(),
1427            HydroNode::ReduceKeyed {
1428                f,
1429                input: Box::new(self.ir_node.into_inner()),
1430                metadata: self.location.new_node_metadata::<(K, V)>(),
1431            },
1432        )
1433    }
1434}
1435
1436impl<'a, K: Eq + Hash, V, L: Location<'a>, Order> Stream<(K, V), Tick<L>, Bounded, Order> {
1437    /// A special case of [`Stream::fold_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1438    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1439    /// in the second element are accumulated via the `comb` closure.
1440    ///
1441    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1442    ///
1443    /// If the input and output value types are the same and do not require initialization then use
1444    /// [`Stream::reduce_keyed_commutative`].
1445    ///
1446    /// # Example
1447    /// ```rust
1448    /// # use hydro_lang::*;
1449    /// # use dfir_rs::futures::StreamExt;
1450    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1451    /// let tick = process.tick();
1452    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1453    /// let batch = unsafe { numbers.tick_batch(&tick) };
1454    /// batch
1455    ///     .fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1456    ///     .all_ticks()
1457    /// # }, |mut stream| async move {
1458    /// // (1, 5), (2, 7)
1459    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1460    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1461    /// # }));
1462    /// ```
1463    pub fn fold_keyed_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V) + 'a>(
1464        self,
1465        init: impl IntoQuotedMut<'a, I, Tick<L>>,
1466        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1467    ) -> Stream<(K, A), Tick<L>, Bounded, Order> {
1468        let init = init.splice_fn0_ctx(&self.location).into();
1469        let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1470
1471        Stream::new(
1472            self.location.clone(),
1473            HydroNode::FoldKeyed {
1474                init,
1475                acc: comb,
1476                input: Box::new(self.ir_node.into_inner()),
1477                metadata: self.location.new_node_metadata::<(K, A)>(),
1478            },
1479        )
1480    }
1481
1482    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
1483    /// # Example
1484    /// ```rust
1485    /// # use hydro_lang::*;
1486    /// # use dfir_rs::futures::StreamExt;
1487    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1488    /// let tick = process.tick();
1489    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1490    /// let batch = unsafe { numbers.tick_batch(&tick) };
1491    /// batch.keys().all_ticks()
1492    /// # }, |mut stream| async move {
1493    /// // 1, 2
1494    /// # assert_eq!(stream.next().await.unwrap(), 1);
1495    /// # assert_eq!(stream.next().await.unwrap(), 2);
1496    /// # }));
1497    /// ```
1498    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, Order> {
1499        self.fold_keyed_commutative(q!(|| ()), q!(|_, _| {}))
1500            .map(q!(|(k, _)| k))
1501    }
1502
1503    /// A special case of [`Stream::reduce_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1504    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1505    /// in the second element are accumulated via the `comb` closure.
1506    ///
1507    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1508    ///
1509    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative`].
1510    ///
1511    /// # Example
1512    /// ```rust
1513    /// # use hydro_lang::*;
1514    /// # use dfir_rs::futures::StreamExt;
1515    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1516    /// let tick = process.tick();
1517    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1518    /// let batch = unsafe { numbers.tick_batch(&tick) };
1519    /// batch
1520    ///     .reduce_keyed_commutative(q!(|acc, x| *acc += x))
1521    ///     .all_ticks()
1522    /// # }, |mut stream| async move {
1523    /// // (1, 5), (2, 7)
1524    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1525    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1526    /// # }));
1527    /// ```
1528    pub fn reduce_keyed_commutative<F: Fn(&mut V, V) + 'a>(
1529        self,
1530        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1531    ) -> Stream<(K, V), Tick<L>, Bounded, Order> {
1532        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1533
1534        Stream::new(
1535            self.location.clone(),
1536            HydroNode::ReduceKeyed {
1537                f,
1538                input: Box::new(self.ir_node.into_inner()),
1539                metadata: self.location.new_node_metadata::<(K, V)>(),
1540            },
1541        )
1542    }
1543}
1544
1545impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream<T, Atomic<L>, B, Order> {
1546    /// Returns a stream corresponding to the latest batch of elements being atomically
1547    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
1548    /// the order of the input.
1549    ///
1550    /// # Safety
1551    /// The batch boundaries are non-deterministic and may change across executions.
1552    pub unsafe fn tick_batch(self) -> Stream<T, Tick<L>, Bounded, Order> {
1553        Stream::new(
1554            self.location.clone().tick,
1555            HydroNode::Unpersist {
1556                inner: Box::new(self.ir_node.into_inner()),
1557                metadata: self.location.new_node_metadata::<T>(),
1558            },
1559        )
1560    }
1561
1562    pub fn end_atomic(self) -> Stream<T, L, B, Order> {
1563        Stream::new(self.location.tick.l, self.ir_node.into_inner())
1564    }
1565
1566    pub fn atomic_source(&self) -> Tick<L> {
1567        self.location.tick.clone()
1568    }
1569}
1570
1571impl<'a, T, L: Location<'a> + NoTick + NoAtomic, B, Order> Stream<T, L, B, Order> {
1572    pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, Order> {
1573        Stream::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
1574    }
1575
1576    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1577    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1578    /// the order of the input.
1579    ///
1580    /// # Safety
1581    /// The batch boundaries are non-deterministic and may change across executions.
1582    pub unsafe fn tick_batch(self, tick: &Tick<L>) -> Stream<T, Tick<L>, Bounded, Order> {
1583        unsafe { self.atomic(tick).tick_batch() }
1584    }
1585
1586    /// Given a time interval, returns a stream corresponding to samples taken from the
1587    /// stream roughly at that interval. The output will have elements in the same order
1588    /// as the input, but with arbitrary elements skipped between samples. There is also
1589    /// no guarantee on the exact timing of the samples.
1590    ///
1591    /// # Safety
1592    /// The output stream is non-deterministic in which elements are sampled, since this
1593    /// is controlled by a clock.
1594    pub unsafe fn sample_every(
1595        self,
1596        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1597    ) -> Stream<T, L, Unbounded, Order> {
1598        let samples = unsafe {
1599            // SAFETY: source of intentional non-determinism
1600            self.location.source_interval(interval)
1601        };
1602
1603        let tick = self.location.tick();
1604        unsafe {
1605            // SAFETY: source of intentional non-determinism
1606            self.tick_batch(&tick)
1607                .continue_if(samples.tick_batch(&tick).first())
1608                .all_ticks()
1609        }
1610    }
1611
1612    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
1613    /// stream has not emitted a value since that duration.
1614    ///
1615    /// # Safety
1616    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1617    /// samples take place, timeouts may be non-deterministically generated or missed,
1618    /// and the notification of the timeout may be delayed as well. There is also no
1619    /// guarantee on how long the [`Optional`] will have a value after the timeout is
1620    /// detected based on when the next sample is taken.
1621    pub unsafe fn timeout(
1622        self,
1623        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
1624    ) -> Optional<(), L, Unbounded>
1625    where
1626        Order: MinOrder<NoOrder, Min = NoOrder>,
1627    {
1628        let tick = self.location.tick();
1629
1630        let latest_received = self.fold_commutative(
1631            q!(|| None),
1632            q!(|latest, _| {
1633                *latest = Some(Instant::now());
1634            }),
1635        );
1636
1637        unsafe {
1638            // SAFETY: Non-deterministic delay in detecting a timeout is expected.
1639            latest_received.latest_tick(&tick)
1640        }
1641        .filter_map(q!(move |latest_received| {
1642            if let Some(latest_received) = latest_received {
1643                if Instant::now().duration_since(latest_received) > duration {
1644                    Some(())
1645                } else {
1646                    None
1647                }
1648            } else {
1649                Some(())
1650            }
1651        }))
1652        .latest()
1653    }
1654}
1655
1656impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream<T, L, B, Order> {
1657    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
1658        let f = f.splice_fn1_ctx(&self.location).into();
1659        let metadata = self.location.new_node_metadata::<T>();
1660        self.location
1661            .flow_state()
1662            .borrow_mut()
1663            .leaves
1664            .as_mut()
1665            .expect(FLOW_USED_MESSAGE)
1666            .push(HydroLeaf::ForEach {
1667                input: Box::new(HydroNode::Unpersist {
1668                    inner: Box::new(self.ir_node.into_inner()),
1669                    metadata: metadata.clone(),
1670                }),
1671                f,
1672                metadata,
1673            });
1674    }
1675
1676    pub fn dest_sink<S: Unpin + futures::Sink<T> + 'a>(
1677        self,
1678        sink: impl QuotedWithContext<'a, S, L>,
1679    ) {
1680        self.location
1681            .flow_state()
1682            .borrow_mut()
1683            .leaves
1684            .as_mut()
1685            .expect(FLOW_USED_MESSAGE)
1686            .push(HydroLeaf::DestSink {
1687                sink: sink.splice_typed_ctx(&self.location).into(),
1688                input: Box::new(self.ir_node.into_inner()),
1689                metadata: self.location.new_node_metadata::<T>(),
1690            });
1691    }
1692}
1693
1694impl<'a, T, L: Location<'a>, Order> Stream<T, Tick<L>, Bounded, Order> {
1695    pub fn all_ticks(self) -> Stream<T, L, Unbounded, Order> {
1696        Stream::new(
1697            self.location.outer().clone(),
1698            HydroNode::Persist {
1699                inner: Box::new(self.ir_node.into_inner()),
1700                metadata: self.location.new_node_metadata::<T>(),
1701            },
1702        )
1703    }
1704
1705    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, Order> {
1706        Stream::new(
1707            Atomic {
1708                tick: self.location.clone(),
1709            },
1710            HydroNode::Persist {
1711                inner: Box::new(self.ir_node.into_inner()),
1712                metadata: self.location.new_node_metadata::<T>(),
1713            },
1714        )
1715    }
1716
1717    pub fn persist(self) -> Stream<T, Tick<L>, Bounded, Order>
1718    where
1719        T: Clone,
1720    {
1721        Stream::new(
1722            self.location.clone(),
1723            HydroNode::Persist {
1724                inner: Box::new(self.ir_node.into_inner()),
1725                metadata: self.location.new_node_metadata::<T>(),
1726            },
1727        )
1728    }
1729
1730    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, Order> {
1731        Stream::new(
1732            self.location.clone(),
1733            HydroNode::DeferTick {
1734                input: Box::new(self.ir_node.into_inner()),
1735                metadata: self.location.new_node_metadata::<T>(),
1736            },
1737        )
1738    }
1739
1740    pub fn delta(self) -> Stream<T, Tick<L>, Bounded, Order> {
1741        Stream::new(
1742            self.location.clone(),
1743            HydroNode::Delta {
1744                inner: Box::new(self.ir_node.into_inner()),
1745                metadata: self.location.new_node_metadata::<T>(),
1746            },
1747        )
1748    }
1749}
1750
1751pub fn serialize_bincode_with_type(is_demux: bool, t_type: syn::Type) -> syn::Expr {
1752    let root = get_this_crate();
1753
1754    if is_demux {
1755        parse_quote! {
1756            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::ClusterId<_>, #t_type), _>(
1757                |(id, data)| {
1758                    (id.raw_id, #root::runtime_support::bincode::serialize(&data).unwrap().into())
1759                }
1760            )
1761        }
1762    } else {
1763        parse_quote! {
1764            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
1765                |data| {
1766                    #root::runtime_support::bincode::serialize(&data).unwrap().into()
1767                }
1768            )
1769        }
1770    }
1771}
1772
1773fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
1774    serialize_bincode_with_type(is_demux, stageleft::quote_type::<T>())
1775}
1776
1777pub fn deserialize_bincode_with_type(tagged: Option<syn::Type>, t_type: syn::Type) -> syn::Expr {
1778    let root = get_this_crate();
1779
1780    if let Some(c_type) = tagged {
1781        parse_quote! {
1782            |res| {
1783                let (id, b) = res.unwrap();
1784                (#root::ClusterId::<#c_type>::from_raw(id), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
1785            }
1786        }
1787    } else {
1788        parse_quote! {
1789            |res| {
1790                #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
1791            }
1792        }
1793    }
1794}
1795
1796pub(super) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<syn::Type>) -> syn::Expr {
1797    deserialize_bincode_with_type(tagged, stageleft::quote_type::<T>())
1798}
1799
1800impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream<T, L, B, Order> {
1801    pub fn send_bincode<L2: Location<'a>, CoreType>(
1802        self,
1803        other: &L2,
1804    ) -> Stream<<L::Root as CanSend<'a, L2>>::Out<CoreType>, L2, Unbounded, Order::Min>
1805    where
1806        L::Root: CanSend<'a, L2, In<CoreType> = T>,
1807        CoreType: Serialize + DeserializeOwned,
1808        Order: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<Order>>,
1809    {
1810        let serialize_pipeline = Some(serialize_bincode::<CoreType>(L::Root::is_demux()));
1811
1812        let deserialize_pipeline = Some(deserialize_bincode::<CoreType>(L::Root::tagged_type()));
1813
1814        Stream::new(
1815            other.clone(),
1816            HydroNode::Network {
1817                from_key: None,
1818                to_location: other.id(),
1819                to_key: None,
1820                serialize_fn: serialize_pipeline.map(|e| e.into()),
1821                instantiate_fn: DebugInstantiate::Building,
1822                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
1823                input: Box::new(self.ir_node.into_inner()),
1824                metadata: other.new_node_metadata::<CoreType>(),
1825            },
1826        )
1827    }
1828
1829    pub fn send_bincode_external<L2: 'a, CoreType>(
1830        self,
1831        other: &ExternalProcess<L2>,
1832    ) -> ExternalBincodeStream<L::Out<CoreType>>
1833    where
1834        L: CanSend<'a, ExternalProcess<'a, L2>, In<CoreType> = T, Out<CoreType> = CoreType>,
1835        CoreType: Serialize + DeserializeOwned,
1836        // for now, we restirct Out<CoreType> to be CoreType, which means no tagged cluster -> external
1837    {
1838        let serialize_pipeline = Some(serialize_bincode::<CoreType>(L::is_demux()));
1839
1840        let metadata = other.new_node_metadata::<CoreType>();
1841
1842        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
1843
1844        let external_key = flow_state_borrow.next_external_out;
1845        flow_state_borrow.next_external_out += 1;
1846
1847        let leaves = flow_state_borrow.leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled()");
1848
1849        let dummy_f: syn::Expr = syn::parse_quote!(());
1850
1851        leaves.push(HydroLeaf::ForEach {
1852            f: dummy_f.into(),
1853            input: Box::new(HydroNode::Network {
1854                from_key: None,
1855                to_location: other.id(),
1856                to_key: Some(external_key),
1857                serialize_fn: serialize_pipeline.map(|e| e.into()),
1858                instantiate_fn: DebugInstantiate::Building,
1859                deserialize_fn: None,
1860                input: Box::new(self.ir_node.into_inner()),
1861                metadata: metadata.clone(),
1862            }),
1863            metadata,
1864        });
1865
1866        ExternalBincodeStream {
1867            process_id: other.id,
1868            port_id: external_key,
1869            _phantom: PhantomData,
1870        }
1871    }
1872
1873    pub fn send_bytes<L2: Location<'a>>(
1874        self,
1875        other: &L2,
1876    ) -> Stream<<L::Root as CanSend<'a, L2>>::Out<Bytes>, L2, Unbounded, Order::Min>
1877    where
1878        L::Root: CanSend<'a, L2, In<Bytes> = T>,
1879        Order: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<Order>>,
1880    {
1881        let root = get_this_crate();
1882        Stream::new(
1883            other.clone(),
1884            HydroNode::Network {
1885                from_key: None,
1886                to_location: other.id(),
1887                to_key: None,
1888                serialize_fn: None,
1889                instantiate_fn: DebugInstantiate::Building,
1890                deserialize_fn: if let Some(c_type) = L::Root::tagged_type() {
1891                    let expr: syn::Expr = parse_quote!(|(id, b)| (#root::ClusterId<#c_type>::from_raw(id), b.unwrap().freeze()));
1892                    Some(expr.into())
1893                } else {
1894                    let expr: syn::Expr = parse_quote!(|b| b.unwrap().freeze());
1895                    Some(expr.into())
1896                },
1897                input: Box::new(self.ir_node.into_inner()),
1898                metadata: other.new_node_metadata::<Bytes>(),
1899            },
1900        )
1901    }
1902
1903    pub fn send_bytes_external<L2: 'a>(self, other: &ExternalProcess<L2>) -> ExternalBytesPort
1904    where
1905        L::Root: CanSend<'a, ExternalProcess<'a, L2>, In<Bytes> = T, Out<Bytes> = Bytes>,
1906    {
1907        let metadata = other.new_node_metadata::<Bytes>();
1908
1909        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
1910        let external_key = flow_state_borrow.next_external_out;
1911        flow_state_borrow.next_external_out += 1;
1912
1913        let leaves = flow_state_borrow.leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled()");
1914
1915        let dummy_f: syn::Expr = syn::parse_quote!(());
1916
1917        leaves.push(HydroLeaf::ForEach {
1918            f: dummy_f.into(),
1919            input: Box::new(HydroNode::Network {
1920                from_key: None,
1921                to_location: other.id(),
1922                to_key: Some(external_key),
1923                serialize_fn: None,
1924                instantiate_fn: DebugInstantiate::Building,
1925                deserialize_fn: None,
1926                input: Box::new(self.ir_node.into_inner()),
1927                metadata: metadata.clone(),
1928            }),
1929            metadata,
1930        });
1931
1932        ExternalBytesPort {
1933            process_id: other.id,
1934            port_id: external_key,
1935        }
1936    }
1937
1938    pub fn send_bincode_anonymous<L2: Location<'a>, Tag, CoreType>(
1939        self,
1940        other: &L2,
1941    ) -> Stream<CoreType, L2, Unbounded, Order::Min>
1942    where
1943        L::Root: CanSend<'a, L2, In<CoreType> = T, Out<CoreType> = (Tag, CoreType)>,
1944        CoreType: Serialize + DeserializeOwned,
1945        Order: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<Order>>,
1946    {
1947        self.send_bincode::<L2, CoreType>(other).map(q!(|(_, b)| b))
1948    }
1949
1950    pub fn send_bytes_anonymous<L2: Location<'a>, Tag>(
1951        self,
1952        other: &L2,
1953    ) -> Stream<Bytes, L2, Unbounded, Order::Min>
1954    where
1955        L::Root: CanSend<'a, L2, In<Bytes> = T, Out<Bytes> = (Tag, Bytes)>,
1956        Order: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<Order>>,
1957    {
1958        self.send_bytes::<L2>(other).map(q!(|(_, b)| b))
1959    }
1960
1961    #[expect(clippy::type_complexity, reason = "ordering semantics for broadcast")]
1962    pub fn broadcast_bincode<C2: 'a>(
1963        self,
1964        other: &Cluster<'a, C2>,
1965    ) -> Stream<
1966        <L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<T>,
1967        Cluster<'a, C2>,
1968        Unbounded,
1969        Order::Min,
1970    >
1971    where
1972        L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T)>,
1973        T: Clone + Serialize + DeserializeOwned,
1974        Order: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<Order>>,
1975    {
1976        let ids = other.members();
1977
1978        self.flat_map_ordered(q!(|b| ids.iter().map(move |id| (
1979            ::std::clone::Clone::clone(id),
1980            ::std::clone::Clone::clone(&b)
1981        ))))
1982        .send_bincode(other)
1983    }
1984
1985    pub fn broadcast_bincode_anonymous<C2: 'a, Tag>(
1986        self,
1987        other: &Cluster<'a, C2>,
1988    ) -> Stream<T, Cluster<'a, C2>, Unbounded, Order::Min>
1989    where
1990        L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)> + 'a,
1991        T: Clone + Serialize + DeserializeOwned,
1992        Order: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<Order>>,
1993    {
1994        self.broadcast_bincode(other).map(q!(|(_, b)| b))
1995    }
1996
1997    #[expect(clippy::type_complexity, reason = "ordering semantics for broadcast")]
1998    pub fn broadcast_bytes<C2: 'a>(
1999        self,
2000        other: &Cluster<'a, C2>,
2001    ) -> Stream<
2002        <L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<Bytes>,
2003        Cluster<'a, C2>,
2004        Unbounded,
2005        Order::Min,
2006    >
2007    where
2008        L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T)> + 'a,
2009        T: Clone,
2010        Order: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<Order>>,
2011    {
2012        let ids = other.members();
2013
2014        self.flat_map_ordered(q!(|b| ids.iter().map(move |id| (
2015            ::std::clone::Clone::clone(id),
2016            ::std::clone::Clone::clone(&b)
2017        ))))
2018        .send_bytes(other)
2019    }
2020
2021    pub fn broadcast_bytes_anonymous<C2: 'a, Tag>(
2022        self,
2023        other: &Cluster<'a, C2>,
2024    ) -> Stream<Bytes, Cluster<'a, C2>, Unbounded, Order::Min>
2025    where
2026        L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T), Out<Bytes> = (Tag, Bytes)>
2027            + 'a,
2028        T: Clone,
2029        Order: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<Order>>,
2030    {
2031        self.broadcast_bytes(other).map(q!(|(_, b)| b))
2032    }
2033}
2034
2035#[expect(clippy::type_complexity, reason = "ordering semantics for round-robin")]
2036impl<'a, T, L: Location<'a> + NoTick, B> Stream<T, L, B, TotalOrder> {
2037    pub fn round_robin_bincode<C2: 'a>(
2038        self,
2039        other: &Cluster<'a, C2>,
2040    ) -> Stream<
2041        <L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<T>,
2042        Cluster<'a, C2>,
2043        Unbounded,
2044        <TotalOrder as MinOrder<
2045            <L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>,
2046        >>::Min,
2047    >
2048    where
2049        L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T)>,
2050        T: Clone + Serialize + DeserializeOwned,
2051        TotalOrder:
2052            MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
2053    {
2054        let ids = other.members();
2055
2056        self.enumerate()
2057            .map(q!(|(i, w)| (ids[i % ids.len()], w)))
2058            .send_bincode(other)
2059    }
2060
2061    pub fn round_robin_bincode_anonymous<C2: 'a, Tag>(
2062        self,
2063        other: &Cluster<'a, C2>,
2064    ) -> Stream<
2065        T,
2066        Cluster<'a, C2>,
2067        Unbounded,
2068        <TotalOrder as MinOrder<
2069            <L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>,
2070        >>::Min,
2071    >
2072    where
2073        L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)> + 'a,
2074        T: Clone + Serialize + DeserializeOwned,
2075        TotalOrder:
2076            MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
2077    {
2078        self.round_robin_bincode(other).map(q!(|(_, b)| b))
2079    }
2080
2081    pub fn round_robin_bytes<C2: 'a>(
2082        self,
2083        other: &Cluster<'a, C2>,
2084    ) -> Stream<
2085        <L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<Bytes>,
2086        Cluster<'a, C2>,
2087        Unbounded,
2088        <TotalOrder as MinOrder<
2089            <L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>,
2090        >>::Min,
2091    >
2092    where
2093        L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T)> + 'a,
2094        T: Clone,
2095        TotalOrder:
2096            MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
2097    {
2098        let ids = other.members();
2099
2100        self.enumerate()
2101            .map(q!(|(i, w)| (ids[i % ids.len()], w)))
2102            .send_bytes(other)
2103    }
2104
2105    pub fn round_robin_bytes_anonymous<C2: 'a, Tag>(
2106        self,
2107        other: &Cluster<'a, C2>,
2108    ) -> Stream<
2109        Bytes,
2110        Cluster<'a, C2>,
2111        Unbounded,
2112        <TotalOrder as MinOrder<
2113            <L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>,
2114        >>::Min,
2115    >
2116    where
2117        L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T), Out<Bytes> = (Tag, Bytes)>
2118            + 'a,
2119        T: Clone,
2120        TotalOrder:
2121            MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
2122    {
2123        self.round_robin_bytes(other).map(q!(|(_, b)| b))
2124    }
2125}
2126
2127#[cfg(test)]
2128mod tests {
2129    use dfir_rs::futures::StreamExt;
2130    use hydro_deploy::Deployment;
2131    use serde::{Deserialize, Serialize};
2132    use stageleft::q;
2133
2134    use crate::FlowBuilder;
2135    use crate::location::Location;
2136
2137    struct P1 {}
2138    struct P2 {}
2139
2140    #[derive(Serialize, Deserialize, Debug)]
2141    struct SendOverNetwork {
2142        n: u32,
2143    }
2144
2145    #[tokio::test]
2146    async fn first_ten_distributed() {
2147        let mut deployment = Deployment::new();
2148
2149        let flow = FlowBuilder::new();
2150        let first_node = flow.process::<P1>();
2151        let second_node = flow.process::<P2>();
2152        let external = flow.external_process::<P2>();
2153
2154        let numbers = first_node.source_iter(q!(0..10));
2155        let out_port = numbers
2156            .map(q!(|n| SendOverNetwork { n }))
2157            .send_bincode(&second_node)
2158            .send_bincode_external(&external);
2159
2160        let nodes = flow
2161            .with_process(&first_node, deployment.Localhost())
2162            .with_process(&second_node, deployment.Localhost())
2163            .with_external(&external, deployment.Localhost())
2164            .deploy(&mut deployment);
2165
2166        deployment.deploy().await.unwrap();
2167
2168        let mut external_out = nodes.connect_source_bincode(out_port).await;
2169
2170        deployment.start().await.unwrap();
2171
2172        for i in 0..10 {
2173            assert_eq!(external_out.next().await.unwrap().n, i);
2174        }
2175    }
2176}